First stab at writing implementing job dependency.

main
Selwin Ong 12 years ago
parent a0c9267550
commit eadc7db29f

@ -64,7 +64,7 @@ class Job(object):
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None):
result_ttl=None, status=None, parent=None):
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
"""
@ -87,6 +87,9 @@ class Job(object):
job.description = job.get_call_string()
job.result_ttl = result_ttl
job._status = status
# parent could be job instance or id
if parent is not None:
job._parent_id = parent.id if isinstance(parent, Job) else parent
return job
@property
@ -119,6 +122,20 @@ class Job(object):
def is_started(self):
return self.status == Status.STARTED
@property
def parent(self):
"""Returns a job's parent. To avoid repeated Redis fetches, we cache
job.parent as job._parent.
"""
if self._parent_id is None:
return None
if hasattr(self, '_parent'):
return self._parent
job = Job.fetch(self._parent_id, connection=self.connection)
job.refresh()
self._parent = job
return job
@property
def func(self):
func_name = self.func_name
@ -185,6 +202,7 @@ class Job(object):
self.timeout = None
self.result_ttl = None
self._status = None
self._parent_id = None
self.meta = {}
@ -208,11 +226,21 @@ class Job(object):
"""The Redis key that is used to store job hash under."""
return 'rq:job:%s' % (job_id,)
@classmethod
def waitlist_key_for(cls, job_id):
"""The Redis key that is used to store job hash under."""
return 'rq:job:%s:waitlist' % (job_id,)
@property
def key(self):
"""The Redis key that is used to store job hash under."""
return self.key_for(self.id)
@property
def waitlist_key(self):
"""The Redis key that is used to store job hash under."""
return self.waitlist_key_for(self.id)
@property # noqa
def job_tuple(self):
"""Returns the job tuple that encodes the actual function call that
@ -285,6 +313,7 @@ class Job(object):
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = obj.get('status') if obj.get('status') else None
self._parent_id = obj.get('parent_id', None)
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def save(self, pipeline=None):
@ -315,6 +344,8 @@ class Job(object):
obj['result_ttl'] = self.result_ttl
if self._status is not None:
obj['status'] = self._status
if self._parent_id is not None:
obj['parent_id'] = self._parent_id
if self.meta:
obj['meta'] = dumps(self.meta)
@ -381,7 +412,26 @@ class Job(object):
elif ttl > 0:
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl)
def register_dependency(self):
"""Jobs may have a waitlist. Jobs in this waitlist are enqueued
only if the parent job is successfully performed. We maintain this
waitlist in Redis, with key that looks something like:
rq:job:job_id:waitlist = ['job_id_1', 'job_id_2']
This method puts the job on it's parent's waitlist.
"""
# TODO: This can probably be pipelined
self.connection.rpush(Job.waitlist_key_for(self._parent_id), self.id)
def get_waitlist(self):
"""Returns all job ids in the waitlist.
"""
# TODO: This can probably be pipelined
return self.connection.lrange(
self.waitlist_key, 0, self.connection.llen(self.waitlist_key) - 1)
def __str__(self):
return '<Job %s: %s>' % (self.id, self.description)
@ -413,10 +463,11 @@ class Job(object):
def __setattr__(self, name, value):
# Ignore the "private" fields
private_attrs = set(['origin', '_func_name', 'ended_at',
private_attrs = set(('origin', '_func_name', 'ended_at',
'description', '_args', 'created_at', 'enqueued_at', 'connection',
'_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id',
'data', '_instance', 'result_ttl', '_status', 'status', 'meta'])
'data', '_instance', 'result_ttl', '_status', 'status',
'_parent_id', '_parent', 'parent', 'meta'))
if name in private_attrs:
object.__setattr__(self, name, value)

@ -113,7 +113,8 @@ class Queue(object):
"""Pushes a job ID on the corresponding Redis queue."""
self.connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): #noqa
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, after=None):
"""Creates a job to represent the delayed function call and enqueues
it.
@ -123,7 +124,14 @@ class Queue(object):
"""
timeout = timeout or self._default_timeout
job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED)
result_ttl=result_ttl, status=Status.QUEUED,
parent=after)
# If job depends on another job to finish, register itself on it's
# parent's waitlist instead of enqueueing it
if after is not None:
job.register_dependency()
job.save()
return job
return self.enqueue_job(job, timeout=timeout)
def enqueue(self, f, *args, **kwargs):
@ -149,15 +157,18 @@ class Queue(object):
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30)
timeout = None
result_ttl = None
if 'args' in kwargs or 'kwargs' in kwargs:
after = None
if 'args' in kwargs or 'kwargs' in kwargs or 'after' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
timeout = kwargs.pop('timeout', None)
args = kwargs.pop('args', None)
result_ttl = kwargs.pop('result_ttl', None)
after = kwargs.pop('after', None)
kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl)
timeout=timeout, result_ttl=result_ttl,
after=after)
def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution.
@ -188,6 +199,16 @@ class Queue(object):
job.save()
return job
def enqueue_waitlist(self, job):
"""Enqueues all jobs in the waitlist and clears it"""
# TODO: can probably be pipelined
job_ids = job.get_waitlist()
for job_id in job.get_waitlist():
waitlisted_job = Job.fetch(job_id, connection=self.connection)
self.enqueue_job(waitlisted_job)
if job_ids:
self.connection.delete(job.waitlist_key)
def pop_job_id(self):
"""Pops a given job ID from this Redis queue."""
return self.connection.lpop(self.key)

@ -328,6 +328,7 @@ class Worker(object):
self.connection.expire(self.key, (job.timeout or 180) + 60)
self.fork_and_perform_job(job)
self.connection.expire(self.key, self.default_worker_ttl)
queue.enqueue_waitlist(job)
did_perform_work = True
finally:

@ -131,6 +131,22 @@ class TestJob(RQTestCase):
self.testconn.hkeys(job.key),
['created_at', 'data', 'description'])
def test_persistence_of_parent_job(self):
"""Storing jobs with parent job, either instance or key."""
parent_job = Job.create(func=some_calculation)
parent_job.save()
job = Job.create(func=some_calculation, parent=parent_job)
job.save()
stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._parent_id, parent_job.id)
self.assertEqual(stored_job.parent, parent_job)
job = Job.create(func=some_calculation, parent=parent_job.id)
job.save()
stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._parent_id, parent_job.id)
self.assertEqual(stored_job.parent, parent_job)
def test_store_then_fetch(self):
"""Store, then fetch."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
@ -242,10 +258,27 @@ class TestJob(RQTestCase):
job.cleanup(ttl=-1)
self.assertEqual(self.testconn.ttl(job.key), -1)
# Jobs with positive TTLs are eventually deleted
# Jobs with positive TTLs are eventually deleted
job.cleanup(ttl=100)
self.assertEqual(self.testconn.ttl(job.key), 100)
# Jobs with 0 TTL are immediately deleted
job.cleanup(ttl=0)
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_register_dependency(self):
"""Test that jobs updates the correct job waitlist"""
job = Job.create(func=say_hello)
job._parent_id = 'id'
job.save()
job.register_dependency()
self.assertEqual(self.testconn.lpop('rq:job:id:waitlist'), job.id)
def test_get_waitlist(self):
"""Test that all waitlisted job ids are fetched"""
job = Job.create(func=say_hello)
self.assertEqual(job.get_waitlist(), [])
self.testconn.lpush(job.waitlist_key, 'id_1')
self.assertEqual(job.get_waitlist(), ['id_1'])
self.testconn.lpush(job.waitlist_key, 'id_2')
self.assertEqual(job.get_waitlist(), ['id_2', 'id_1'])

@ -237,6 +237,31 @@ class TestQueue(RQTestCase):
job = q.enqueue(say_hello)
self.assertEqual(job.status, Status.QUEUED)
def test_enqueue_waitlist(self):
"""Enqueueing a waitlist pushes all jobs in waitlist to queue"""
q = Queue()
parent_job = Job.create(func=say_hello)
parent_job.save()
job_1 = Job.create(func=say_hello, parent=parent_job)
job_1.save()
job_1.register_dependency()
job_2 = Job.create(func=say_hello, parent=parent_job)
job_2.save()
job_2.register_dependency()
# After waitlist is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, [])
q.enqueue_waitlist(parent_job)
self.assertEqual(q.job_ids, [job_1.id, job_2.id])
self.assertFalse(self.testconn.exists(parent_job.waitlist_key))
def test_enqueue_job_with_dependency(self):
"""Job with dependency is not queued right away"""
parent_job = Job.create(func=say_hello)
q = Queue()
q.enqueue_call(say_hello, after=parent_job)
self.assertEqual(q.job_ids, [])
class TestFailedQueue(RQTestCase):
def test_requeue_job(self):

@ -234,3 +234,12 @@ class TestWorker(RQTestCase):
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, True)
def test_job_dependency(self):
q = Queue()
w = Worker([q])
parent_job = q.enqueue(say_hello)
job = q.enqueue_call(say_hello, after=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.status, 'finished')

Loading…
Cancel
Save