diff --git a/rq/job.py b/rq/job.py index 97d5161..775abb2 100644 --- a/rq/job.py +++ b/rq/job.py @@ -232,9 +232,9 @@ class Job(object): return b'rq:job:' + job_id.encode('utf-8') @classmethod - def waitlist_key_for(cls, job_id): + def dependents_key_for(cls, job_id): """The Redis key that is used to store job hash under.""" - return 'rq:job:%s:waitlist' % (job_id,) + return 'rq:job:%s:dependents' % (job_id,) @property def key(self): @@ -242,9 +242,9 @@ class Job(object): return self.key_for(self.id) @property - def waitlist_key(self): + def dependents_key(self): """The Redis key that is used to store job hash under.""" - return self.waitlist_key_for(self.id) + return self.dependents_key_for(self.id) @property # noqa def job_tuple(self): @@ -422,16 +422,17 @@ class Job(object): connection.expire(self.key, ttl) def register_dependency(self): - """Jobs may have a waitlist. Jobs in this waitlist are enqueued - only if the dependency job is successfully performed. We maintain this - waitlist in a Redis set, with key that looks something like: - - rq:job:job_id:waitlist = {'job_id_1', 'job_id_2'} - - This method puts the job on it's dependency's waitlist. + """Jobs may have dependencies. Jobs are enqueued only if the job they + depend on is successfully performed. We record this relation as + a reverse dependency (a Redis set), with a key that looks something + like: + + rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} + + This method adds the current job in its dependency's dependents set. """ # TODO: This can probably be pipelined - self.connection.sadd(Job.waitlist_key_for(self._dependency_id), self.id) + self.connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) def __str__(self): return '' % (self.id, self.description) diff --git a/rq/queue.py b/rq/queue.py index 70f4094..2cf2ccd 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -156,7 +156,7 @@ class Queue(object): description=description, depends_on=depends_on) # If job depends on an unfinished job, register itself on it's - # parent's waitlist instead of enqueueing it. + # parent's dependents instead of enqueueing it. # If WatchError is raised in the process, that means something else is # modifying the dependency. In this case we simply retry if depends_on is not None: @@ -243,15 +243,15 @@ class Queue(object): job.save() return job - def enqueue_waitlist(self, job): - """Enqueues all jobs in the waitlist and clears it""" + def enqueue_dependents(self, job): + """Enqueues all jobs in the given job's dependents set and clears it.""" # TODO: can probably be pipelined while True: - job_id = as_text(self.connection.spop(job.waitlist_key)) + job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break - waitlisted_job = Job.fetch(job_id, connection=self.connection) - self.enqueue_job(waitlisted_job) + dependent = Job.fetch(job_id, connection=self.connection) + self.enqueue_job(dependent) def pop_job_id(self): """Pops a given job ID from this Redis queue.""" diff --git a/rq/worker.py b/rq/worker.py index e8b9e69..ddd71ab 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -327,7 +327,7 @@ class Worker(object): self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) if job.status == 'finished': - queue.enqueue_waitlist(job) + queue.enqueue_dependents(job) did_perform_work = True finally: diff --git a/tests/test_job.py b/tests/test_job.py index 73966db..6b01d30 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -284,9 +284,9 @@ class TestJob(RQTestCase): self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) def test_register_dependency(self): - """Test that jobs updates the correct job waitlist""" + """Test that jobs updates the correct job dependents.""" job = Job.create(func=say_hello) job._dependency_id = 'id' job.save() job.register_dependency() - self.assertEqual(as_text(self.testconn.spop('rq:job:id:waitlist')), job.id) + self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id) diff --git a/tests/test_queue.py b/tests/test_queue.py index c7602a7..716c3dc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -289,8 +289,8 @@ class TestQueue(RQTestCase): # Queue.all() should still report the empty queues self.assertEquals(len(Queue.all()), 3) - def test_enqueue_waitlist(self): - """Enqueueing a waitlist pushes all jobs in waitlist to queue""" + def test_enqueue_dependents(self): + """Enqueueing the dependent jobs pushes all jobs in the depends set to the queue.""" q = Queue() parent_job = Job.create(func=say_hello) parent_job.save() @@ -301,11 +301,11 @@ class TestQueue(RQTestCase): job_2.save() job_2.register_dependency() - # After waitlist is enqueued, job_1 and job_2 should be in queue + # After dependents is enqueued, job_1 and job_2 should be in queue self.assertEqual(q.job_ids, []) - q.enqueue_waitlist(parent_job) + q.enqueue_dependents(parent_job) self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id])) - self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) + self.assertFalse(self.testconn.exists(parent_job.dependents_key)) def test_enqueue_job_with_dependency(self): """Jobs are enqueued only when their dependencies are finished""" diff --git a/tests/test_worker.py b/tests/test_worker.py index a681a79..fe77c04 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -236,7 +236,7 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_failed, True) def test_job_dependency(self): - """Enqueue waitlisted jobs only if their parents don't fail""" + """Enqueue dependent jobs only if their parents don't fail""" q = Queue() w = Worker([q]) parent_job = q.enqueue(say_hello)