Merge branch 'master' into job_dependency

Conflicts:
	rq/job.py
	rq/queue.py
main
Selwin Ong 11 years ago
commit fcfe55fe13

10
.gitignore vendored

@ -1,5 +1,7 @@
*.pyc
dump.rdb
/*.egg-info
.env
/.tox/
*.egg-info
/dump.rdb
/.env
/.tox
/dist

@ -2,6 +2,7 @@ language: python
python:
- "2.6"
- "2.7"
- "3.3"
- "pypy"
install:
- if [[ $TRAVIS_PYTHON_VERSION == 2.6 ]]; then pip install -r py26-requirements.txt; fi

@ -1,5 +1,35 @@
### 0.3.12
(not released yet)
- Ability to provide a custom job description (instead of using the default
function invocation hint). Thanks, İbrahim.
- Temporary key for the compact queue is now randomly generated, which should
avoid name clashes for concurrent compact actions.
### 0.3.11
(August 23th, 2013)
- Some more fixes in command line scripts for Python 3
### 0.3.10
(August 20th, 2013)
- Bug fix in setup.py
### 0.3.9
(August 20th, 2013)
- Python 3 compatibility (Thanks, Alex!)
- Minor bug fix where Sentry would break when func cannot be imported
### 0.3.8
(not yet released)
(June 17th, 2013)
- `rqworker` and `rqinfo` have a `--url` argument to connect to a Redis url.

BIN
dist/rq-0.3.6.tar.gz vendored

Binary file not shown.

@ -39,3 +39,33 @@ else:
opfunc.__doc__ = getattr(int, opname).__doc__
setattr(cls, opname, opfunc)
return cls
PY2 = sys.version_info[0] < 3
if PY2:
string_types = (str, unicode)
text_type = unicode
def as_text(v):
return v
def decode_redis_hash(h):
return h
else:
string_types = (str,)
text_type = str
def as_text(v):
if v is None:
return None
elif isinstance(v, bytes):
return v.decode('utf-8')
elif isinstance(v, str):
return v
else:
raise ValueError('Unknown type %r' % type(v))
def decode_redis_hash(h):
return dict((as_text(k), h[k]) for k in h)

@ -21,6 +21,7 @@ import logging.handlers
import re
import sys
import types
from rq.compat import string_types
IDENTIFIER = re.compile('^[a-z_][a-z0-9_]*$', re.I)
@ -230,7 +231,7 @@ class BaseConfigurator(object):
isinstance(value, tuple):
value = ConvertingTuple(value)
value.configurator = self
elif isinstance(value, basestring): # str for py3k
elif isinstance(value, string_types): # str for py3k
m = self.CONVERT_PATTERN.match(value)
if m:
d = m.groupdict()
@ -245,7 +246,7 @@ class BaseConfigurator(object):
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
if not hasattr(c, '__call__') and type(c) != type:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
@ -296,21 +297,21 @@ class DictConfigurator(BaseConfigurator):
level = handler_config.get('level', None)
if level:
handler.setLevel(_checkLevel(level))
except StandardError, e:
except Exception as e:
raise ValueError('Unable to configure handler '
'%r: %s' % (name, e))
loggers = config.get('loggers', EMPTY_DICT)
for name in loggers:
try:
self.configure_logger(name, loggers[name], True)
except StandardError, e:
except Exception as e:
raise ValueError('Unable to configure logger '
'%r: %s' % (name, e))
root = config.get('root', None)
if root:
try:
self.configure_root(root, True)
except StandardError, e:
except Exception as e:
raise ValueError('Unable to configure root '
'logger: %s' % e)
else:
@ -325,7 +326,7 @@ class DictConfigurator(BaseConfigurator):
try:
formatters[name] = self.configure_formatter(
formatters[name])
except StandardError, e:
except Exception as e:
raise ValueError('Unable to configure '
'formatter %r: %s' % (name, e))
# Next, do filters - they don't refer to anything else, either
@ -333,7 +334,7 @@ class DictConfigurator(BaseConfigurator):
for name in filters:
try:
filters[name] = self.configure_filter(filters[name])
except StandardError, e:
except Exception as e:
raise ValueError('Unable to configure '
'filter %r: %s' % (name, e))
@ -346,7 +347,7 @@ class DictConfigurator(BaseConfigurator):
handler = self.configure_handler(handlers[name])
handler.name = name
handlers[name] = handler
except StandardError, e:
except Exception as e:
raise ValueError('Unable to configure handler '
'%r: %s' % (name, e))
# Next, do loggers - they refer to handlers and filters
@ -385,7 +386,7 @@ class DictConfigurator(BaseConfigurator):
existing.remove(name)
try:
self.configure_logger(name, loggers[name])
except StandardError, e:
except Exception as e:
raise ValueError('Unable to configure logger '
'%r: %s' % (name, e))
@ -408,7 +409,7 @@ class DictConfigurator(BaseConfigurator):
if root:
try:
self.configure_root(root)
except StandardError, e:
except Exception as e:
raise ValueError('Unable to configure root '
'logger: %s' % e)
finally:
@ -420,7 +421,7 @@ class DictConfigurator(BaseConfigurator):
factory = config['()'] # for use in exception handler
try:
result = self.configure_custom(config)
except TypeError, te:
except TypeError as te:
if "'format'" not in str(te):
raise
#Name of parameter changed from fmt to format.
@ -450,7 +451,7 @@ class DictConfigurator(BaseConfigurator):
for f in filters:
try:
filterer.addFilter(self.config['filters'][f])
except StandardError, e:
except Exception as e:
raise ValueError('Unable to add filter %r: %s' % (f, e))
def configure_handler(self, config):
@ -459,14 +460,14 @@ class DictConfigurator(BaseConfigurator):
if formatter:
try:
formatter = self.config['formatters'][formatter]
except StandardError, e:
except Exception as e:
raise ValueError('Unable to set formatter '
'%r: %s' % (formatter, e))
level = config.pop('level', None)
filters = config.pop('filters', None)
if '()' in config:
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
if not hasattr(c, '__call__') and type(c) != type:
c = self.resolve(c)
factory = c
else:
@ -476,7 +477,7 @@ class DictConfigurator(BaseConfigurator):
'target' in config:
try:
config['target'] = self.config['handlers'][config['target']]
except StandardError, e:
except Exception as e:
raise ValueError('Unable to set target handler '
'%r: %s' % (config['target'], e))
elif issubclass(klass, logging.handlers.SMTPHandler) and\
@ -489,7 +490,7 @@ class DictConfigurator(BaseConfigurator):
kwargs = dict([(k, config[k]) for k in config if valid_ident(k)])
try:
result = factory(**kwargs)
except TypeError, te:
except TypeError as te:
if "'stream'" not in str(te):
raise
#The argument name changed from strm to stream
@ -511,7 +512,7 @@ class DictConfigurator(BaseConfigurator):
for h in handlers:
try:
logger.addHandler(self.config['handlers'][h])
except StandardError, e:
except Exception as e:
raise ValueError('Unable to add handler %r: %s' % (h, e))
def common_logger_config(self, logger, config, incremental=False):

@ -18,8 +18,8 @@ def Connection(connection=None):
finally:
popped = pop_connection()
assert popped == connection, \
'Unexpected Redis connection was popped off the stack. ' \
'Check your Redis connection setup.'
'Unexpected Redis connection was popped off the stack. ' \
'Check your Redis connection setup.'
def push_connection(redis):
@ -37,7 +37,7 @@ def use_connection(redis=None):
use of use_connection() and stacked connection contexts.
"""
assert len(_connection_stack) <= 1, \
'You should not mix Connection contexts with use_connection().'
'You should not mix Connection contexts with use_connection().'
release_local(_connection_stack)
if redis is None:
@ -61,13 +61,11 @@ def resolve_connection(connection=None):
connection = get_current_connection()
if connection is None:
raise NoRedisConnectionException(
'Could not resolve a Redis connection.')
raise NoRedisConnectionException('Could not resolve a Redis connection.')
return connection
_connection_stack = LocalStack()
__all__ = ['Connection',
'get_current_connection', 'push_connection', 'pop_connection',
'use_connection']
__all__ = ['Connection', 'get_current_connection', 'push_connection',
'pop_connection', 'use_connection']

@ -7,7 +7,7 @@ def register_sentry(client, worker):
exc_info=exc_info,
extra={
'job_id': job.id,
'func': job.func,
'func': job.func_name,
'args': job.args,
'kwargs': job.kwargs,
'description': job.description,

@ -2,11 +2,12 @@ from functools import wraps
from .queue import Queue
from .connections import resolve_connection
from .worker import DEFAULT_RESULT_TTL
from rq.compat import string_types
class job(object):
class job(object):
def __init__(self, queue, connection=None, timeout=None,
result_ttl=DEFAULT_RESULT_TTL):
result_ttl=DEFAULT_RESULT_TTL):
"""A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a required
``queue`` argument that can be either a ``Queue`` instance or a string
@ -26,11 +27,11 @@ class job(object):
def __call__(self, f):
@wraps(f)
def delay(*args, **kwargs):
if isinstance(self.queue, basestring):
if isinstance(self.queue, string_types):
queue = Queue(name=self.queue, connection=self.connection)
else:
queue = self.queue
return queue.enqueue_call(f, args=args, kwargs=kwargs,
timeout=self.timeout, result_ttl=self.result_ttl)
timeout=self.timeout, result_ttl=self.result_ttl)
f.delay = delay
return f

@ -2,18 +2,23 @@ import importlib
import inspect
import times
from uuid import uuid4
from cPickle import loads, dumps, UnpicklingError
try:
from cPickle import loads, dumps, UnpicklingError
except ImportError: # noqa
from pickle import loads, dumps, UnpicklingError # noqa
from .local import LocalStack
from .connections import resolve_connection
from .exceptions import UnpickleError, NoSuchJobError
from rq.compat import text_type, decode_redis_hash, as_text
def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
return type(name, (), values)
Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')
Status = enum('Status',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')
def unpickle(pickled_string):
@ -26,7 +31,7 @@ def unpickle(pickled_string):
"""
try:
obj = loads(pickled_string)
except (StandardError, UnpicklingError) as e:
except (Exception, UnpicklingError) as e:
raise UnpickleError('Could not unpickle.', pickled_string, e)
return obj
@ -64,7 +69,7 @@ class Job(object):
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, dependency=None):
result_ttl=None, status=None, description=None, dependency=None):
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
"""
@ -76,7 +81,7 @@ class Job(object):
assert isinstance(kwargs, dict), '%r is not a valid kwargs dict.' % (kwargs,)
job = cls(connection=connection)
if inspect.ismethod(func):
job._instance = func.im_self
job._instance = func.__self__
job._func_name = func.__name__
elif inspect.isfunction(func) or inspect.isbuiltin(func):
job._func_name = '%s.%s' % (func.__module__, func.__name__)
@ -84,7 +89,7 @@ class Job(object):
job._func_name = func
job._args = args
job._kwargs = kwargs
job.description = job.get_call_string()
job.description = description or job.get_call_string()
job.result_ttl = result_ttl
job._status = status
# dependency could be job instance or id
@ -97,7 +102,7 @@ class Job(object):
return self._func_name
def _get_status(self):
self._status = self.connection.hget(self.key, 'status')
self._status = as_text(self.connection.hget(self.key, 'status'))
return self._status
def _set_status(self, status):
@ -212,7 +217,7 @@ class Job(object):
first time the ID is requested.
"""
if self._id is None:
self._id = unicode(uuid4())
self._id = text_type(uuid4())
return self._id
def set_id(self, value):
@ -224,7 +229,7 @@ class Job(object):
@classmethod
def key_for(cls, job_id):
"""The Redis key that is used to store job hash under."""
return 'rq:job:%s' % (job_id,)
return b'rq:job:' + job_id.encode('utf-8')
@classmethod
def waitlist_key_for(cls, job_id):
@ -283,7 +288,7 @@ class Job(object):
Will raise a NoSuchJobError if no corresponding Redis key exists.
"""
key = self.key
obj = self.connection.hgetall(key)
obj = decode_redis_hash(self.connection.hgetall(key))
if len(obj) == 0:
raise NoSuchJobError('No such job: %s' % (key,))
@ -291,7 +296,7 @@ class Job(object):
if date_str is None:
return None
else:
return times.to_universal(date_str)
return times.to_universal(as_text(date_str))
try:
self.data = obj['data']
@ -303,24 +308,21 @@ class Job(object):
except UnpickleError:
if not safe:
raise
self.created_at = to_date(obj.get('created_at'))
self.origin = obj.get('origin')
self.description = obj.get('description')
self.enqueued_at = to_date(obj.get('enqueued_at'))
self.ended_at = to_date(obj.get('ended_at'))
self.created_at = to_date(as_text(obj.get('created_at')))
self.origin = as_text(obj.get('origin'))
self.description = as_text(obj.get('description'))
self.enqueued_at = to_date(as_text(obj.get('enqueued_at')))
self.ended_at = to_date(as_text(obj.get('ended_at')))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.exc_info = obj.get('exc_info')
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = obj.get('status') if obj.get('status') else None
self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = obj.get('dependency_id', None)
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def save(self, pipeline=None):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
connection = pipeline if pipeline is not None else self.connection
def dump(self):
"""Returns a serialization of the current job instance"""
obj = {}
obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')
@ -349,7 +351,14 @@ class Job(object):
if self.meta:
obj['meta'] = dumps(self.meta)
connection.hmset(key, obj)
return obj
def save(self, pipeline=None):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.dump())
def cancel(self):
"""Cancels the given job, which will prevent the job from ever being
@ -377,7 +386,6 @@ class Job(object):
assert self.id == _job_stack.pop()
return self._result
def get_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a job and its result
will be persisted. In the future, this method will also be responsible
@ -406,7 +414,7 @@ class Job(object):
- If it's a positive number, set the job to expire in X seconds.
- If result_ttl is negative, don't set an expiry to it (persist
forever)
"""
"""
if ttl == 0:
self.cancel()
elif ttl > 0:
@ -436,46 +444,4 @@ class Job(object):
def __hash__(self):
return hash(self.id)
# Backwards compatibility for custom properties
def __getattr__(self, name): # noqa
import warnings
warnings.warn(
"Getting custom properties from the job instance directly "
"will be unsupported as of RQ 0.4. Please use the meta dict "
"to store all custom variables. So instead of this:\n\n"
"\tjob.foo\n\n"
"Use this:\n\n"
"\tjob.meta['foo']\n",
SyntaxWarning)
try:
return self.__dict__['meta'][name] # avoid recursion
except KeyError:
return getattr(super(Job, self), name)
def __setattr__(self, name, value):
# Ignore the "private" fields
private_attrs = set(('origin', '_func_name', 'ended_at',
'description', '_args', 'created_at', 'enqueued_at', 'connection',
'_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id',
'data', '_instance', 'result_ttl', '_status', 'status',
'_dependency_id', '_dependency', 'dependency', 'meta'))
if name in private_attrs:
object.__setattr__(self, name, value)
return
import warnings
warnings.warn(
"Setting custom properties on the job instance directly will "
"be unsupported as of RQ 0.4. Please use the meta dict to "
"store all custom variables. So instead of this:\n\n"
"\tjob.foo = 'bar'\n\n"
"Use this:\n\n"
"\tjob.meta['foo'] = 'bar'\n",
SyntaxWarning)
self.__dict__['meta'][name] = value
_job_stack = LocalStack()

@ -13,11 +13,14 @@
# current thread ident.
try:
from greenlet import getcurrent as get_ident
except ImportError: # noqa
except ImportError: # noqa
try:
from thread import get_ident # noqa
except ImportError: # noqa
from dummy_thread import get_ident # noqa
except ImportError: # noqa
try:
from _thread import get_ident # noqa
except ImportError: # noqa
from dummy_thread import get_ident # noqa
def release_local(local):
@ -116,6 +119,7 @@ class LocalStack(object):
def _get__ident_func__(self):
return self._local.__ident_func__
def _set__ident_func__(self, value): # noqa
object.__setattr__(self._local, '__ident_func__', value)
__ident_func__ = property(_get__ident_func__, _set__ident_func__)

@ -1,9 +1,12 @@
import times
import uuid
from .connections import resolve_connection
from .job import Job, Status
from .exceptions import (DequeueTimeout, InvalidJobOperationError,
NoSuchJobError, UnpickleError)
from .compat import total_ordering
from .compat import total_ordering, string_types, as_text
from redis import WatchError
@ -29,8 +32,9 @@ class Queue(object):
connection = resolve_connection(connection)
def to_queue(queue_key):
return cls.from_queue_key(queue_key, connection=connection)
return map(to_queue, connection.keys('%s*' % prefix))
return cls.from_queue_key(as_text(queue_key),
connection=connection)
return list(map(to_queue, connection.keys('%s*' % prefix)))
@classmethod
def from_queue_key(cls, queue_key, connection=None):
@ -76,17 +80,19 @@ class Queue(object):
return None
return job
def get_job_ids(self, start=0, limit=-1):
def get_job_ids(self, offset=0, length=-1):
"""Returns a slice of job IDs in the queue."""
if limit >= 0:
end = start + limit
start = offset
if length >= 0:
end = offset + (length - 1)
else:
end = limit
return self.connection.lrange(self.key, start, end)
end = length
return [as_text(job_id) for job_id in
self.connection.lrange(self.key, start, end)]
def get_jobs(self, start=0, limit=-1):
def get_jobs(self, offset=0, length=-1):
"""Returns a slice of jobs in the queue."""
job_ids = self.get_job_ids(start, limit)
job_ids = self.get_job_ids(offset, length)
return compact([self.safe_fetch_job(job_id) for job_id in job_ids])
@property
@ -113,11 +119,11 @@ class Queue(object):
"""Removes all "dead" jobs from the queue by cycling through it, while
guarantueeing FIFO semantics.
"""
COMPACT_QUEUE = 'rq:queue:_compact'
COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4())
self.connection.rename(self.key, COMPACT_QUEUE)
while True:
job_id = self.connection.lpop(COMPACT_QUEUE)
job_id = as_text(self.connection.lpop(COMPACT_QUEUE))
if job_id is None:
break
if Job.exists(job_id, self.connection):
@ -130,7 +136,7 @@ class Queue(object):
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, after=None):
result_ttl=None, description=None, after=None):
"""Creates a job to represent the delayed function call and enqueues
it.
@ -139,10 +145,11 @@ class Queue(object):
contain options for RQ itself.
"""
timeout = timeout or self._default_timeout
# TODO: job with dependency shouldn't have "queued" as status
job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED,
dependency=after)
description=description, dependency=after)
# If job depends on an unfinished job, register itself on it's
# parent's waitlist instead of enqueueing it.
@ -177,19 +184,20 @@ class Queue(object):
* A string, representing the location of a function (must be
meaningful to the import context of the workers)
"""
if not isinstance(f, basestring) and f.__module__ == '__main__':
raise ValueError(
'Functions from the __main__ module cannot be processed '
'by workers.')
if not isinstance(f, string_types) and f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed '
'by workers.')
# Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30)
timeout = None
description = None
result_ttl = None
after = None
if 'args' in kwargs or 'kwargs' in kwargs or 'after' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
timeout = kwargs.pop('timeout', None)
description = kwargs.pop('description', None)
args = kwargs.pop('args', None)
result_ttl = kwargs.pop('result_ttl', None)
after = kwargs.pop('after', None)
@ -197,7 +205,7 @@ class Queue(object):
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl,
after=after)
description=description, after=after)
def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution.
@ -240,7 +248,7 @@ class Queue(object):
def pop_job_id(self):
"""Pops a given job ID from this Redis queue."""
return self.connection.lpop(self.key)
return as_text(self.connection.lpop(self.key))
@classmethod
def lpop(cls, queue_keys, timeout, connection=None):
@ -310,7 +318,7 @@ class Queue(object):
result = cls.lpop(queue_keys, timeout, connection=connection)
if result is None:
return None
queue_key, job_id = result
queue_key, job_id = map(as_text, result)
queue = cls.from_queue_key(queue_key, connection=connection)
try:
job = Job.fetch(job_id, connection=connection)

@ -1,35 +1,45 @@
import importlib
import redis
from warnings import warn
from rq import use_connection
def add_standard_arguments(parser):
parser.add_argument('--config', '-c', default=None,
help='Module containing RQ settings.')
help='Module containing RQ settings.')
parser.add_argument('--url', '-u', default=None,
help='URL describing Redis connection details. '
'Overrides other connection arguments if supplied.')
help='URL describing Redis connection details. '
'Overrides other connection arguments if supplied.')
parser.add_argument('--host', '-H', default=None,
help='The Redis hostname (default: localhost)')
help='The Redis hostname (default: localhost)')
parser.add_argument('--port', '-p', default=None,
help='The Redis portnumber (default: 6379)')
help='The Redis portnumber (default: 6379)')
parser.add_argument('--db', '-d', type=int, default=None,
help='The Redis database (default: 0)')
help='The Redis database (default: 0)')
parser.add_argument('--password', '-a', default=None,
help='The Redis password (default: None)')
help='The Redis password (default: None)')
parser.add_argument('--socket', '-s', default=None,
help='The Redis Unix socket')
help='The Redis Unix socket')
def read_config_file(module):
"""Reads all UPPERCASE variables defined in the given module file."""
settings = importlib.import_module(module)
return dict([(k, v)
for k, v in settings.__dict__.items()
if k.upper() == k])
for k, v in settings.__dict__.items()
if k.upper() == k])
def setup_default_arguments(args, settings):
""" Sets up args from settings or defaults """
if args.url is None:
args.url = settings.get('REDIS_URL')
if (args.host or args.port or args.socket or args.db or args.password):
warn('Host, port, db, password options for Redis will not be '
'supported in future versions of RQ. '
'Please use `REDIS_URL` or `--url` instead.', DeprecationWarning)
if args.host is None:
args.host = settings.get('REDIS_HOST', 'localhost')
@ -54,5 +64,5 @@ def setup_redis(args):
redis_conn = redis.StrictRedis.from_url(args.url)
else:
redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db,
password=args.password, unix_socket_path=args.socket)
password=args.password, unix_socket_path=args.socket)
use_connection(redis_conn)

@ -44,7 +44,7 @@ def state_symbol(state):
def show_queues(args):
if len(args.queues):
qs = map(Queue, args.queues)
qs = list(map(Queue, args.queues))
else:
qs = Queue.all()
@ -79,7 +79,7 @@ def show_queues(args):
def show_workers(args):
if len(args.queues):
qs = map(Queue, args.queues)
qs = list(map(Queue, args.queues))
def any_matching_queue(worker):
def queue_matches(q):
@ -101,9 +101,9 @@ def show_workers(args):
for w in ws:
worker_queues = filter_queues(w.queue_names())
if not args.raw:
print '%s %s: %s' % (w.name, state_symbol(w.state), ', '.join(worker_queues))
print('%s %s: %s' % (w.name, state_symbol(w.state), ', '.join(worker_queues)))
else:
print 'worker %s %s %s' % (w.name, w.state, ','.join(worker_queues))
print('worker %s %s %s' % (w.name, w.state, ','.join(worker_queues)))
else:
# Create reverse lookup table
queues = dict([(q, []) for q in qs])
@ -119,21 +119,21 @@ def show_workers(args):
queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.state)), queues[q])))
else:
queues_str = ''
print '%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)
print('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str))
if not args.raw:
print '%d workers, %d queues' % (len(ws), len(qs))
print('%d workers, %d queues' % (len(ws), len(qs)))
def show_both(args):
show_queues(args)
if not args.raw:
print ''
print('')
show_workers(args)
if not args.raw:
print ''
print('')
import datetime
print 'Updated: %s' % datetime.datetime.now()
print('Updated: %s' % datetime.datetime.now())
def parse_args():
@ -186,5 +186,5 @@ def main():
print(e)
sys.exit(1)
except KeyboardInterrupt:
print
print()
sys.exit(0)

@ -27,6 +27,8 @@ def parse_args():
parser.add_argument('--verbose', '-v', action='store_true', default=False, help='Show more output')
parser.add_argument('--quiet', '-q', action='store_true', default=False, help='Show less output')
parser.add_argument('--sentry-dsn', action='store', default=None, metavar='URL', help='Report exceptions to this Sentry DSN')
parser.add_argument('--pid', action='store', default=None,
help='Write the process ID number to a file at the specified path')
parser.add_argument('queues', nargs='*', help='The queues to listen on (default: \'default\')')
return parser.parse_args()
@ -65,13 +67,17 @@ def main():
args.sentry_dsn = settings.get('SENTRY_DSN',
os.environ.get('SENTRY_DSN', None))
if args.pid:
with open(os.path.expanduser(args.pid), "w") as fp:
fp.write(str(os.getpid()))
setup_loghandlers_from_args(args)
setup_redis(args)
cleanup_ghosts()
try:
queues = map(Queue, args.queues)
queues = list(map(Queue, args.queues))
w = Worker(queues, name=args.name)
# Should we configure Sentry?

@ -33,7 +33,7 @@ class death_penalty_after(object):
def handle_death_penalty(self, signum, frame):
raise JobTimeoutException('Job exceeded maximum timeout '
'value (%d seconds).' % self._timeout)
'value (%d seconds).' % self._timeout)
def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises

@ -16,8 +16,7 @@ def gettermsize():
def ioctl_GWINSZ(fd):
try:
import fcntl, termios, struct # noqa
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
'1234'))
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
except:
return None
return cr
@ -53,7 +52,7 @@ class _Colorizer(object):
self.codes["overline"] = esc + "06m"
dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue",
"purple", "teal", "lightgray"]
"purple", "teal", "lightgray"]
light_colors = ["darkgray", "red", "green", "yellow", "blue",
"fuchsia", "turquoise", "white"]
@ -69,8 +68,11 @@ class _Colorizer(object):
self.codes["darkyellow"] = self.codes["brown"]
self.codes["fuscia"] = self.codes["fuchsia"]
self.codes["white"] = self.codes["bold"]
self.notty = not sys.stdout.isatty()
if hasattr(sys.stdout, "isatty"):
self.notty = not sys.stdout.isatty()
else:
self.notty = True
def reset_color(self):
return self.codes["reset"]
@ -136,7 +138,7 @@ class ColorizingStreamHandler(logging.StreamHandler):
def __init__(self, exclude=None, *args, **kwargs):
self.exclude = exclude
if is_python_version((2,6)):
if is_python_version((2, 6)):
logging.StreamHandler.__init__(self, *args, **kwargs)
else:
super(ColorizingStreamHandler, self).__init__(*args, **kwargs)

@ -1 +1 @@
VERSION = '0.3.8-dev'
VERSION = '0.3.11'

@ -21,6 +21,7 @@ from .logutils import setup_loghandlers
from .exceptions import NoQueueError, UnpickleError, DequeueTimeout
from .timeouts import death_penalty_after
from .version import VERSION
from rq.compat import text_type, as_text
green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
@ -42,9 +43,9 @@ def iterable(x):
def compact(l):
return [x for x in l if x is not None]
_signames = dict((getattr(signal, signame), signame) \
for signame in dir(signal) \
if signame.startswith('SIG') and '_' not in signame)
_signames = dict((getattr(signal, signame), signame)
for signame in dir(signal)
if signame.startswith('SIG') and '_' not in signame)
def signal_name(signum):
@ -67,8 +68,8 @@ class Worker(object):
if connection is None:
connection = get_current_connection()
reported_working = connection.smembers(cls.redis_workers_keys)
workers = [cls.find_by_key(key, connection) for key in
reported_working]
workers = [cls.find_by_key(as_text(key), connection)
for key in reported_working]
return compact(workers)
@classmethod
@ -90,17 +91,16 @@ class Worker(object):
name = worker_key[len(prefix):]
worker = cls([], name, connection=connection)
queues = connection.hget(worker.key, 'queues')
queues = as_text(connection.hget(worker.key, 'queues'))
worker._state = connection.hget(worker.key, 'state') or '?'
if queues:
worker.queues = [Queue(queue, connection=connection)
for queue in queues.split(',')]
for queue in queues.split(',')]
return worker
def __init__(self, queues, name=None,
default_result_ttl=DEFAULT_RESULT_TTL, connection=None,
exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa
default_result_ttl=DEFAULT_RESULT_TTL, connection=None,
exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa
if connection is None:
connection = get_current_connection()
self.connection = connection
@ -192,9 +192,8 @@ class Worker(object):
self.log.debug('Registering birth of worker %s' % (self.name,))
if self.connection.exists(self.key) and \
not self.connection.hexists(self.key, 'death'):
raise ValueError(
'There exists an active worker named \'%s\' '
'already.' % (self.name,))
raise ValueError('There exists an active worker named \'%s\' '
'already.' % (self.name,))
key = self.key
now = time.time()
queues = ','.join(self.queue_names())
@ -303,8 +302,8 @@ class Worker(object):
qnames = self.queue_names()
self.procline('Listening on %s' % ','.join(qnames))
self.log.info('')
self.log.info('*** Listening on %s...' % \
green(', '.join(qnames)))
self.log.info('*** Listening on %s...' %
green(', '.join(qnames)))
timeout = None if burst else max(1, self.default_worker_ttl - 60)
try:
result = self.dequeue_job_and_maintain_ttl(timeout)
@ -323,7 +322,7 @@ class Worker(object):
# Use the public setter here, to immediately update Redis
job.status = Status.STARTED
self.log.info('%s: %s (%s)' % (green(queue.name),
blue(job.description), job.id))
blue(job.description), job.id))
self.connection.expire(self.key, (job.timeout or 180) + 60)
self.fork_and_perform_job(job)
@ -337,19 +336,17 @@ class Worker(object):
self.register_death()
return did_perform_work
def dequeue_job_and_maintain_ttl(self, timeout):
while True:
try:
return Queue.dequeue_any(self.queues, timeout,
connection=self.connection)
connection=self.connection)
except DequeueTimeout:
pass
self.log.debug('Sending heartbeat to prevent worker timeout.')
self.connection.expire(self.key, self.default_worker_ttl)
def fork_and_perform_job(self, job):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
@ -433,7 +430,7 @@ class Worker(object):
if rv is None:
self.log.info('Job OK')
else:
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))
self.log.info('Job OK, result = %s' % (yellow(text_type(rv)),))
if result_ttl == 0:
self.log.info('Result discarded immediately.')
@ -444,12 +441,10 @@ class Worker(object):
return True
def handle_exception(self, job, *exc_info):
"""Walks the exception handler stack to delegate exception handling."""
exc_string = ''.join(
traceback.format_exception_only(*exc_info[:2]) +
traceback.format_exception(*exc_info))
exc_string = ''.join(traceback.format_exception_only(*exc_info[:2]) +
traceback.format_exception(*exc_info))
self.log.error(exc_string)
for handler in reversed(self._exc_handlers):

@ -10,9 +10,9 @@ from setuptools import setup, find_packages
def get_version():
basedir = os.path.dirname(__file__)
with open(os.path.join(basedir, 'rq/version.py')) as f:
VERSION = None
exec(f.read())
return VERSION
locals = {}
exec(f.read(), locals)
return locals['VERSION']
raise RuntimeError('No version info found.')

@ -53,7 +53,7 @@ class RQTestCase(unittest.TestCase):
cls.testconn = testconn
# Shut up logging
logging.disable("ERROR")
logging.disable(logging.ERROR)
def setUp(self):
# Flush beforewards (we like our hygiene)

@ -3,7 +3,10 @@ from datetime import datetime
from tests import RQTestCase
from tests.fixtures import Number, some_calculation, say_hello, access_self
from tests.helpers import strip_milliseconds
from cPickle import loads
try:
from cPickle import loads
except ImportError:
from pickle import loads
from rq.job import Job, get_current_job
from rq.exceptions import NoSuchJobError, UnpickleError
from rq.queue import Queue
@ -75,7 +78,7 @@ class TestJob(RQTestCase):
# Saving creates a Redis hash
self.assertEquals(self.testconn.exists(job.key), False)
job.save()
self.assertEquals(self.testconn.type(job.key), 'hash')
self.assertEquals(self.testconn.type(job.key), b'hash')
# Saving writes pickled job data
unpickled_data = loads(self.testconn.hget(job.key, 'data'))
@ -85,9 +88,9 @@ class TestJob(RQTestCase):
"""Fetching jobs."""
# Prepare test
self.testconn.hset('rq:job:some_id', 'data',
"(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n.") # noqa
"(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n.")
self.testconn.hset('rq:job:some_id', 'created_at',
"2012-02-07 22:13:24+0000")
'2012-02-07 22:13:24+0000')
# Fetch returns a job
job = Job.fetch('some_id')
@ -105,15 +108,15 @@ class TestJob(RQTestCase):
job.save()
expected_date = strip_milliseconds(job.created_at)
stored_date = self.testconn.hget(job.key, 'created_at')
stored_date = self.testconn.hget(job.key, 'created_at').decode('utf-8')
self.assertEquals(
times.to_universal(stored_date),
expected_date)
times.to_universal(stored_date),
expected_date)
# ... and no other keys are stored
self.assertItemsEqual(
self.testconn.hkeys(job.key),
['created_at'])
self.assertEqual(
self.testconn.hkeys(job.key),
[b'created_at'])
def test_persistence_of_typical_jobs(self):
"""Storing typical jobs."""
@ -121,15 +124,15 @@ class TestJob(RQTestCase):
job.save()
expected_date = strip_milliseconds(job.created_at)
stored_date = self.testconn.hget(job.key, 'created_at')
stored_date = self.testconn.hget(job.key, 'created_at').decode('utf-8')
self.assertEquals(
times.to_universal(stored_date),
expected_date)
times.to_universal(stored_date),
expected_date)
# ... and no other keys are stored
self.assertItemsEqual(
self.testconn.hkeys(job.key),
['created_at', 'data', 'description'])
self.assertEqual(
sorted(self.testconn.hkeys(job.key)),
[b'created_at', b'data', b'description'])
def test_persistence_of_parent_job(self):
"""Storing jobs with parent job, either instance or key."""
@ -185,7 +188,7 @@ class TestJob(RQTestCase):
# equivalent to a worker not having the most up-to-date source code
# and unable to import the function)
data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace('say_hello', 'shut_up')
unimportable_data = data.replace(b'say_hello', b'shut_up')
self.testconn.hset(job.key, 'data', unimportable_data)
job.refresh()
@ -208,14 +211,27 @@ class TestJob(RQTestCase):
"""Ensure that job's result_ttl is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10)
job.save()
job_from_queue = Job.fetch(job.id, connection=self.testconn)
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, 10)
job = Job.create(func=say_hello, args=('Lionel',))
job.save()
job_from_queue = Job.fetch(job.id, connection=self.testconn)
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, None)
def test_description_is_persisted(self):
"""Ensure that job's custom description is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), description=u'Say hello!')
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, u'Say hello!')
# Ensure job description is constructed from function call string
job = Job.create(func=say_hello, args=('Lionel',))
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
def test_job_access_within_job_function(self):
"""The current job is accessible within the job function."""
# Executing the job function from outside of RQ throws an exception
@ -253,7 +269,7 @@ class TestJob(RQTestCase):
"""Test that jobs and results are expired properly."""
job = Job.create(func=say_hello)
job.save()
# Jobs with negative TTLs don't expire
job.cleanup(ttl=-1)
self.assertEqual(self.testconn.ttl(job.key), -1)

@ -107,7 +107,9 @@ class TestQueue(RQTestCase):
# Inspect data inside Redis
q_key = 'rq:queue:default'
self.assertEquals(self.testconn.llen(q_key), 1)
self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id)
self.assertEquals(
self.testconn.lrange(q_key, 0, -1)[0].decode('ascii'),
job_id)
def test_enqueue_sets_metadata(self):
"""Enqueueing job onto queues modifies meta data."""
@ -268,7 +270,7 @@ class TestQueue(RQTestCase):
self.assertFalse(self.testconn.exists(parent_job.waitlist_key))
def test_enqueue_job_with_dependency(self):
"""Test enqueueing job with dependency"""
"""Jobs are enqueued only when their dependencies are finished"""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello)
q = Queue()
@ -290,7 +292,7 @@ class TestFailedQueue(RQTestCase):
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
self.assertItemsEqual(Queue.all(), [get_failed_queue()]) # noqa
self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa
self.assertEquals(get_failed_queue().count, 1)
get_failed_queue().requeue(job.id)

@ -0,0 +1,29 @@
from tests import RQTestCase
from rq import Queue, Worker, get_failed_queue
from rq.contrib.sentry import register_sentry
class FakeSentry(object):
def captureException(self, *args, **kwds):
pass # we cannot check this, because worker forks
class TestSentry(RQTestCase):
def test_work_fails(self):
"""Non importable jobs should be put on the failed queue event with sentry"""
q = Queue()
failed_q = get_failed_queue()
# Action
q.enqueue('_non.importable.job')
self.assertEquals(q.count, 1)
w = Worker([q])
register_sentry(FakeSentry(), w)
w.work(burst=True)
# Postconditions
self.assertEquals(failed_q.count, 1)
self.assertEquals(q.count, 0)

@ -57,7 +57,7 @@ class TestWorker(RQTestCase):
job = Job.create(func=div_by_zero, args=(3,))
job.save()
data = self.testconn.hget(job.key, 'data')
invalid_data = data.replace('div_by_zero', 'nonexisting_job')
invalid_data = data.replace(b'div_by_zero', b'nonexisting_job')
assert data != invalid_data
self.testconn.hset(job.key, 'data', invalid_data)

@ -1,8 +1,8 @@
[tox]
envlist=py26,py27,pypy
envlist=py26,py27,py33,pypy
[testenv]
commands=py.test
commands=py.test []
deps=pytest
[testenv:py26]

Loading…
Cancel
Save