Merge remote-tracking branch 'upstream/master'

Conflicts:
	tests/test_queue.py
main
Rob Harrigan 11 years ago
commit 02c5d902ec

@ -1,9 +1,14 @@
### 0.3.12
### 0.4.0
(not released yet)
- Job dependencies! Thanks, Selwin.
- Ability to provide a custom job description (instead of using the default
function invocation hint). Thanks, İbrahim.
- Temporary key for the compact queue is now randomly generated, which should
avoid name clashes for concurrent compact actions.
### 0.3.11
(August 23th, 2013)

@ -61,13 +61,11 @@ def resolve_connection(connection=None):
connection = get_current_connection()
if connection is None:
raise NoRedisConnectionException(
'Could not resolve a Redis connection.')
raise NoRedisConnectionException('Could not resolve a Redis connection.')
return connection
_connection_stack = LocalStack()
__all__ = ['Connection',
'get_current_connection', 'push_connection', 'pop_connection',
'use_connection']
__all__ = ['Connection', 'get_current_connection', 'push_connection',
'pop_connection', 'use_connection']

@ -4,8 +4,8 @@ from .connections import resolve_connection
from .worker import DEFAULT_RESULT_TTL
from rq.compat import string_types
class job(object):
class job(object):
def __init__(self, queue, connection=None, timeout=None,
result_ttl=DEFAULT_RESULT_TTL):
"""A decorator that adds a ``delay`` method to the decorated function,

@ -15,5 +15,6 @@ class UnpickleError(Exception):
super(UnpickleError, self).__init__(message, inner_exception)
self.raw_data = raw_data
class DequeueTimeout(Exception):
pass

@ -16,7 +16,8 @@ def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
return type(name, (), values)
Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed',
Status = enum('Status',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')
@ -68,7 +69,7 @@ class Job(object):
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None):
result_ttl=None, status=None, description=None, dependency=None):
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
"""
@ -91,6 +92,9 @@ class Job(object):
job.description = description or job.get_call_string()
job.result_ttl = result_ttl
job._status = status
# dependency could be job instance or id
if dependency is not None:
job._dependency_id = dependency.id if isinstance(dependency, Job) else dependency
return job
@property
@ -123,6 +127,20 @@ class Job(object):
def is_started(self):
return self.status == Status.STARTED
@property
def dependency(self):
"""Returns a job's dependency. To avoid repeated Redis fetches, we cache
job.dependency as job._dependency.
"""
if self._dependency_id is None:
return None
if hasattr(self, '_dependency'):
return self._dependency
job = Job.fetch(self._dependency_id, connection=self.connection)
job.refresh()
self._dependency = job
return job
@property
def func(self):
func_name = self.func_name
@ -189,6 +207,7 @@ class Job(object):
self.timeout = None
self.result_ttl = None
self._status = None
self._dependency_id = None
self.meta = {}
@ -212,11 +231,21 @@ class Job(object):
"""The Redis key that is used to store job hash under."""
return b'rq:job:' + job_id.encode('utf-8')
@classmethod
def waitlist_key_for(cls, job_id):
"""The Redis key that is used to store job hash under."""
return 'rq:job:%s:waitlist' % (job_id,)
@property
def key(self):
"""The Redis key that is used to store job hash under."""
return self.key_for(self.id)
@property
def waitlist_key(self):
"""The Redis key that is used to store job hash under."""
return self.waitlist_key_for(self.id)
@property # noqa
def job_tuple(self):
"""Returns the job tuple that encodes the actual function call that
@ -289,13 +318,11 @@ class Job(object):
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = as_text(obj.get('dependency_id', None))
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def save(self, pipeline=None):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
connection = pipeline if pipeline is not None else self.connection
def dump(self):
"""Returns a serialization of the current job instance"""
obj = {}
obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')
@ -319,10 +346,19 @@ class Job(object):
obj['result_ttl'] = self.result_ttl
if self._status is not None:
obj['status'] = self._status
if self._dependency_id is not None:
obj['dependency_id'] = self._dependency_id
if self.meta:
obj['meta'] = dumps(self.meta)
connection.hmset(key, obj)
return obj
def save(self, pipeline=None):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.dump())
def cancel(self):
"""Cancels the given job, which will prevent the job from ever being
@ -350,7 +386,6 @@ class Job(object):
assert self.id == _job_stack.pop()
return self._result
def get_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a job and its result
will be persisted. In the future, this method will also be responsible
@ -386,6 +421,17 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl)
def register_dependency(self):
"""Jobs may have a waitlist. Jobs in this waitlist are enqueued
only if the dependency job is successfully performed. We maintain this
waitlist in Redis, with key that looks something like:
rq:job:job_id:waitlist = ['job_id_1', 'job_id_2']
This method puts the job on it's dependency's waitlist.
"""
# TODO: This can probably be pipelined
self.connection.rpush(Job.waitlist_key_for(self._dependency_id), self.id)
def __str__(self):
return '<Job %s: %s>' % (self.id, self.description)
@ -398,5 +444,4 @@ class Job(object):
def __hash__(self):
return hash(self.id)
_job_stack = LocalStack()

@ -119,6 +119,7 @@ class LocalStack(object):
def _get__ident_func__(self):
return self._local.__ident_func__
def _set__ident_func__(self, value): # noqa
object.__setattr__(self._local, '__ident_func__', value)
__ident_func__ = property(_get__ident_func__, _set__ident_func__)

@ -1,10 +1,15 @@
import times
import uuid
from .connections import resolve_connection
from .job import Job, Status
from .exceptions import (NoSuchJobError, UnpickleError,
InvalidJobOperationError, DequeueTimeout)
from .exceptions import (DequeueTimeout, InvalidJobOperationError,
NoSuchJobError, UnpickleError)
from .compat import total_ordering, string_types, as_text
from redis import WatchError
def get_failed_queue(connection=None):
"""Returns a handle to the special failed queue."""
@ -115,7 +120,7 @@ class Queue(object):
"""Removes all "dead" jobs from the queue by cycling through it, while
guarantueeing FIFO semantics.
"""
COMPACT_QUEUE = 'rq:queue:_compact'
COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4())
self.connection.rename(self.key, COMPACT_QUEUE)
while True:
@ -130,8 +135,9 @@ class Queue(object):
"""Pushes a job ID on the corresponding Redis queue."""
self.connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None):
result_ttl=None, description=None, after=None):
"""Creates a job to represent the delayed function call and enqueues
it.
@ -140,8 +146,29 @@ class Queue(object):
contain options for RQ itself.
"""
timeout = timeout or self._default_timeout
job = Job.create(func, args, kwargs, description=description, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED)
# TODO: job with dependency shouldn't have "queued" as status
job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED,
description=description, dependency=after)
# If job depends on an unfinished job, register itself on it's
# parent's waitlist instead of enqueueing it.
# If WatchError is raised in the process, that means something else is
# modifying the dependency. In this case we simply retry
if after is not None:
with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(after.key)
if after.status != Status.FINISHED:
job.register_dependency()
job.save()
return job
break
except WatchError:
continue
return self.enqueue_job(job, timeout=timeout)
def enqueue(self, f, *args, **kwargs):
@ -167,16 +194,19 @@ class Queue(object):
timeout = None
description = None
result_ttl = None
if 'args' in kwargs or 'kwargs' in kwargs:
after = None
if 'args' in kwargs or 'kwargs' in kwargs or 'after' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
timeout = kwargs.pop('timeout', None)
description = kwargs.pop('description', None)
args = kwargs.pop('args', None)
result_ttl = kwargs.pop('result_ttl', None)
after = kwargs.pop('after', None)
kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs, description=description,
timeout=timeout, result_ttl=result_ttl)
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl,
description=description, after=after)
def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution.
@ -210,6 +240,16 @@ class Queue(object):
job.save()
return job
def enqueue_waitlist(self, job):
"""Enqueues all jobs in the waitlist and clears it"""
# TODO: can probably be pipelined
while True:
job_id = as_text(self.connection.lpop(job.waitlist_key))
if job_id is None:
break
waitlisted_job = Job.fetch(job_id, connection=self.connection)
self.enqueue_job(waitlisted_job)
def pop_job_id(self):
"""Pops a given job ID from this Redis queue."""
return as_text(self.connection.lpop(self.key))

@ -1,5 +1,6 @@
import importlib
import redis
from warnings import warn
from rq import use_connection
@ -20,6 +21,7 @@ def add_standard_arguments(parser):
parser.add_argument('--socket', '-s', default=None,
help='The Redis Unix socket')
def read_config_file(module):
"""Reads all UPPERCASE variables defined in the given module file."""
settings = importlib.import_module(module)
@ -30,6 +32,14 @@ def read_config_file(module):
def setup_default_arguments(args, settings):
""" Sets up args from settings or defaults """
if args.url is None:
args.url = settings.get('REDIS_URL')
if (args.host or args.port or args.socket or args.db or args.password):
warn('Host, port, db, password options for Redis will not be '
'supported in future versions of RQ. '
'Please use `REDIS_URL` or `--url` instead.', DeprecationWarning)
if args.host is None:
args.host = settings.get('REDIS_HOST', 'localhost')

@ -16,8 +16,7 @@ def gettermsize():
def ioctl_GWINSZ(fd):
try:
import fcntl, termios, struct # noqa
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
'1234'))
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
except:
return None
return cr
@ -139,7 +138,7 @@ class ColorizingStreamHandler(logging.StreamHandler):
def __init__(self, exclude=None, *args, **kwargs):
self.exclude = exclude
if is_python_version((2,6)):
if is_python_version((2, 6)):
logging.StreamHandler.__init__(self, *args, **kwargs)
else:
super(ColorizingStreamHandler, self).__init__(*args, **kwargs)

@ -43,8 +43,8 @@ def iterable(x):
def compact(l):
return [x for x in l if x is not None]
_signames = dict((getattr(signal, signame), signame) \
for signame in dir(signal) \
_signames = dict((getattr(signal, signame), signame)
for signame in dir(signal)
if signame.startswith('SIG') and '_' not in signame)
@ -68,8 +68,8 @@ class Worker(object):
if connection is None:
connection = get_current_connection()
reported_working = connection.smembers(cls.redis_workers_keys)
workers = [cls.find_by_key(as_text(key), connection) for key in
reported_working]
workers = [cls.find_by_key(as_text(key), connection)
for key in reported_working]
return compact(workers)
@classmethod
@ -98,7 +98,6 @@ class Worker(object):
for queue in queues.split(',')]
return worker
def __init__(self, queues, name=None,
default_result_ttl=DEFAULT_RESULT_TTL, connection=None,
exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa
@ -193,8 +192,7 @@ class Worker(object):
self.log.debug('Registering birth of worker %s' % (self.name,))
if self.connection.exists(self.key) and \
not self.connection.hexists(self.key, 'death'):
raise ValueError(
'There exists an active worker named \'%s\' '
raise ValueError('There exists an active worker named \'%s\' '
'already.' % (self.name,))
key = self.key
now = time.time()
@ -304,7 +302,7 @@ class Worker(object):
qnames = self.queue_names()
self.procline('Listening on %s' % ','.join(qnames))
self.log.info('')
self.log.info('*** Listening on %s...' % \
self.log.info('*** Listening on %s...' %
green(', '.join(qnames)))
timeout = None if burst else max(1, self.default_worker_ttl - 60)
try:
@ -329,6 +327,8 @@ class Worker(object):
self.connection.expire(self.key, (job.timeout or 180) + 60)
self.fork_and_perform_job(job)
self.connection.expire(self.key, self.default_worker_ttl)
if job.status == 'finished':
queue.enqueue_waitlist(job)
did_perform_work = True
finally:
@ -336,7 +336,6 @@ class Worker(object):
self.register_death()
return did_perform_work
def dequeue_job_and_maintain_ttl(self, timeout):
while True:
try:
@ -348,7 +347,6 @@ class Worker(object):
self.log.debug('Sending heartbeat to prevent worker timeout.')
self.connection.expire(self.key, self.default_worker_ttl)
def fork_and_perform_job(self, job):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
@ -443,11 +441,9 @@ class Worker(object):
return True
def handle_exception(self, job, *exc_info):
"""Walks the exception handler stack to delegate exception handling."""
exc_string = ''.join(
traceback.format_exception_only(*exc_info[:2]) +
exc_string = ''.join(traceback.format_exception_only(*exc_info[:2]) +
traceback.format_exception(*exc_info))
self.log.error(exc_string)

@ -7,6 +7,7 @@ try:
from cPickle import loads
except ImportError:
from pickle import loads
from rq.compat import as_text
from rq.job import Job, get_current_job
from rq.exceptions import NoSuchJobError, UnpickleError
from rq.queue import Queue
@ -134,6 +135,22 @@ class TestJob(RQTestCase):
sorted(self.testconn.hkeys(job.key)),
[b'created_at', b'data', b'description'])
def test_persistence_of_parent_job(self):
"""Storing jobs with parent job, either instance or key."""
parent_job = Job.create(func=some_calculation)
parent_job.save()
job = Job.create(func=some_calculation, dependency=parent_job)
job.save()
stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job)
job = Job.create(func=some_calculation, dependency=parent_job.id)
job.save()
stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job)
def test_store_then_fetch(self):
"""Store, then fetch."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
@ -265,3 +282,11 @@ class TestJob(RQTestCase):
# Jobs with 0 TTL are immediately deleted
job.cleanup(ttl=0)
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_register_dependency(self):
"""Test that jobs updates the correct job waitlist"""
job = Job.create(func=say_hello)
job._dependency_id = 'id'
job.save()
job.register_dependency()
self.assertEqual(as_text(self.testconn.lpop('rq:job:id:waitlist')), job.id)

@ -277,6 +277,39 @@ class TestQueue(RQTestCase):
self.testconn.srem(Queue.redis_queues_keys, s.key)
self.assertEquals(len(Queue.all()), 2)
def test_enqueue_waitlist(self):
"""Enqueueing a waitlist pushes all jobs in waitlist to queue"""
q = Queue()
parent_job = Job.create(func=say_hello)
parent_job.save()
job_1 = Job.create(func=say_hello, dependency=parent_job)
job_1.save()
job_1.register_dependency()
job_2 = Job.create(func=say_hello, dependency=parent_job)
job_2.save()
job_2.register_dependency()
# After waitlist is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, [])
q.enqueue_waitlist(parent_job)
self.assertEqual(q.job_ids, [job_1.id, job_2.id])
self.assertFalse(self.testconn.exists(parent_job.waitlist_key))
def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished"""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello)
q = Queue()
q.enqueue_call(say_hello, after=parent_job)
self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
parent_job.status = 'finished'
parent_job.save()
job = q.enqueue_call(say_hello, after=parent_job)
self.assertEqual(q.job_ids, [job.id])
class TestFailedQueue(RQTestCase):
def test_requeue_job(self):
"""Requeueing existing jobs."""

@ -234,3 +234,19 @@ class TestWorker(RQTestCase):
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, True)
def test_job_dependency(self):
"""Enqueue waitlisted jobs only if their parents don't fail"""
q = Queue()
w = Worker([q])
parent_job = q.enqueue(say_hello)
job = q.enqueue_call(say_hello, after=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.status, 'finished')
parent_job = q.enqueue(div_by_zero)
job = q.enqueue_call(say_hello, after=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertNotEqual(job.status, 'finished')

Loading…
Cancel
Save