Reverse dependency mapping should be a set instead of a list.

main
Selwin Ong 11 years ago
parent 93e5e552b7
commit 15c4b562ba

@ -424,14 +424,14 @@ class Job(object):
def register_dependency(self): def register_dependency(self):
"""Jobs may have a waitlist. Jobs in this waitlist are enqueued """Jobs may have a waitlist. Jobs in this waitlist are enqueued
only if the dependency job is successfully performed. We maintain this only if the dependency job is successfully performed. We maintain this
waitlist in Redis, with key that looks something like: waitlist in a Redis set, with key that looks something like:
rq:job:job_id:waitlist = ['job_id_1', 'job_id_2'] rq:job:job_id:waitlist = {'job_id_1', 'job_id_2'}
This method puts the job on it's dependency's waitlist. This method puts the job on it's dependency's waitlist.
""" """
# TODO: This can probably be pipelined # TODO: This can probably be pipelined
self.connection.rpush(Job.waitlist_key_for(self._dependency_id), self.id) self.connection.sadd(Job.waitlist_key_for(self._dependency_id), self.id)
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.description) return '<Job %s: %s>' % (self.id, self.description)

@ -247,7 +247,7 @@ class Queue(object):
"""Enqueues all jobs in the waitlist and clears it""" """Enqueues all jobs in the waitlist and clears it"""
# TODO: can probably be pipelined # TODO: can probably be pipelined
while True: while True:
job_id = as_text(self.connection.lpop(job.waitlist_key)) job_id = as_text(self.connection.spop(job.waitlist_key))
if job_id is None: if job_id is None:
break break
waitlisted_job = Job.fetch(job_id, connection=self.connection) waitlisted_job = Job.fetch(job_id, connection=self.connection)

@ -289,4 +289,4 @@ class TestJob(RQTestCase):
job._dependency_id = 'id' job._dependency_id = 'id'
job.save() job.save()
job.register_dependency() job.register_dependency()
self.assertEqual(as_text(self.testconn.lpop('rq:job:id:waitlist')), job.id) self.assertEqual(as_text(self.testconn.spop('rq:job:id:waitlist')), job.id)

@ -304,7 +304,7 @@ class TestQueue(RQTestCase):
# After waitlist is enqueued, job_1 and job_2 should be in queue # After waitlist is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
q.enqueue_waitlist(parent_job) q.enqueue_waitlist(parent_job)
self.assertEqual(q.job_ids, [job_1.id, job_2.id]) self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id]))
self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) self.assertFalse(self.testconn.exists(parent_job.waitlist_key))
def test_enqueue_job_with_dependency(self): def test_enqueue_job_with_dependency(self):

Loading…
Cancel
Save