Terminology change: waitlist -> dependents.

main
Vincent Driessen 12 years ago
parent 15c4b562ba
commit bb3dc5b0b2

@ -232,9 +232,9 @@ class Job(object):
return b'rq:job:' + job_id.encode('utf-8') return b'rq:job:' + job_id.encode('utf-8')
@classmethod @classmethod
def waitlist_key_for(cls, job_id): def dependents_key_for(cls, job_id):
"""The Redis key that is used to store job hash under.""" """The Redis key that is used to store job hash under."""
return 'rq:job:%s:waitlist' % (job_id,) return 'rq:job:%s:dependents' % (job_id,)
@property @property
def key(self): def key(self):
@ -242,9 +242,9 @@ class Job(object):
return self.key_for(self.id) return self.key_for(self.id)
@property @property
def waitlist_key(self): def dependents_key(self):
"""The Redis key that is used to store job hash under.""" """The Redis key that is used to store job hash under."""
return self.waitlist_key_for(self.id) return self.dependents_key_for(self.id)
@property # noqa @property # noqa
def job_tuple(self): def job_tuple(self):
@ -422,16 +422,17 @@ class Job(object):
connection.expire(self.key, ttl) connection.expire(self.key, ttl)
def register_dependency(self): def register_dependency(self):
"""Jobs may have a waitlist. Jobs in this waitlist are enqueued """Jobs may have dependencies. Jobs are enqueued only if the job they
only if the dependency job is successfully performed. We maintain this depend on is successfully performed. We record this relation as
waitlist in a Redis set, with key that looks something like: a reverse dependency (a Redis set), with a key that looks something
like:
rq:job:job_id:waitlist = {'job_id_1', 'job_id_2'}
rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
This method puts the job on it's dependency's waitlist.
This method adds the current job in its dependency's dependents set.
""" """
# TODO: This can probably be pipelined # TODO: This can probably be pipelined
self.connection.sadd(Job.waitlist_key_for(self._dependency_id), self.id) self.connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.description) return '<Job %s: %s>' % (self.id, self.description)

@ -156,7 +156,7 @@ class Queue(object):
description=description, depends_on=depends_on) description=description, depends_on=depends_on)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's waitlist instead of enqueueing it. # parent's dependents instead of enqueueing it.
# If WatchError is raised in the process, that means something else is # If WatchError is raised in the process, that means something else is
# modifying the dependency. In this case we simply retry # modifying the dependency. In this case we simply retry
if depends_on is not None: if depends_on is not None:
@ -243,15 +243,15 @@ class Queue(object):
job.save() job.save()
return job return job
def enqueue_waitlist(self, job): def enqueue_dependents(self, job):
"""Enqueues all jobs in the waitlist and clears it""" """Enqueues all jobs in the given job's dependents set and clears it."""
# TODO: can probably be pipelined # TODO: can probably be pipelined
while True: while True:
job_id = as_text(self.connection.spop(job.waitlist_key)) job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None: if job_id is None:
break break
waitlisted_job = Job.fetch(job_id, connection=self.connection) dependent = Job.fetch(job_id, connection=self.connection)
self.enqueue_job(waitlisted_job) self.enqueue_job(dependent)
def pop_job_id(self): def pop_job_id(self):
"""Pops a given job ID from this Redis queue.""" """Pops a given job ID from this Redis queue."""

@ -327,7 +327,7 @@ class Worker(object):
self.fork_and_perform_job(job) self.fork_and_perform_job(job)
self.connection.expire(self.key, self.default_worker_ttl) self.connection.expire(self.key, self.default_worker_ttl)
if job.status == 'finished': if job.status == 'finished':
queue.enqueue_waitlist(job) queue.enqueue_dependents(job)
did_perform_work = True did_perform_work = True
finally: finally:

@ -284,9 +284,9 @@ class TestJob(RQTestCase):
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_register_dependency(self): def test_register_dependency(self):
"""Test that jobs updates the correct job waitlist""" """Test that jobs updates the correct job dependents."""
job = Job.create(func=say_hello) job = Job.create(func=say_hello)
job._dependency_id = 'id' job._dependency_id = 'id'
job.save() job.save()
job.register_dependency() job.register_dependency()
self.assertEqual(as_text(self.testconn.spop('rq:job:id:waitlist')), job.id) self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id)

@ -289,8 +289,8 @@ class TestQueue(RQTestCase):
# Queue.all() should still report the empty queues # Queue.all() should still report the empty queues
self.assertEquals(len(Queue.all()), 3) self.assertEquals(len(Queue.all()), 3)
def test_enqueue_waitlist(self): def test_enqueue_dependents(self):
"""Enqueueing a waitlist pushes all jobs in waitlist to queue""" """Enqueueing the dependent jobs pushes all jobs in the depends set to the queue."""
q = Queue() q = Queue()
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save() parent_job.save()
@ -301,11 +301,11 @@ class TestQueue(RQTestCase):
job_2.save() job_2.save()
job_2.register_dependency() job_2.register_dependency()
# After waitlist is enqueued, job_1 and job_2 should be in queue # After dependents is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
q.enqueue_waitlist(parent_job) q.enqueue_dependents(parent_job)
self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id])) self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id]))
self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) self.assertFalse(self.testconn.exists(parent_job.dependents_key))
def test_enqueue_job_with_dependency(self): def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished""" """Jobs are enqueued only when their dependencies are finished"""

@ -236,7 +236,7 @@ class TestWorker(RQTestCase):
self.assertEqual(job.is_failed, True) self.assertEqual(job.is_failed, True)
def test_job_dependency(self): def test_job_dependency(self):
"""Enqueue waitlisted jobs only if their parents don't fail""" """Enqueue dependent jobs only if their parents don't fail"""
q = Queue() q = Queue()
w = Worker([q]) w = Worker([q])
parent_job = q.enqueue(say_hello) parent_job = q.enqueue(say_hello)

Loading…
Cancel
Save