diff --git a/rq/job.py b/rq/job.py index fbf7d5a..d815c5b 100644 --- a/rq/job.py +++ b/rq/job.py @@ -544,8 +544,14 @@ class Job(object): rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} - This method adds the current job in its dependency's dependents set. + This method adds the job in its dependency's dependents set + and adds the job to DeferredJobRegistry. """ + from .registry import DeferredJobRegistry + + registry = DeferredJobRegistry(self.origin, connection=self.connection) + registry.add(self, pipeline=pipeline) + connection = pipeline if pipeline is not None else self.connection connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) diff --git a/rq/queue.py b/rq/queue.py index d596106..e2c358c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -279,11 +279,16 @@ class Queue(object): def enqueue_dependents(self, job): """Enqueues all jobs in the given job's dependents set and clears it.""" # TODO: can probably be pipelined + from .registry import DeferredJobRegistry + + registry = DeferredJobRegistry(self.name, self.connection) + while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) + registry.remove(dependent) self.enqueue_job(dependent) def pop_job_id(self): diff --git a/rq/registry.py b/rq/registry.py index b4cf43f..08798eb 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -24,7 +24,7 @@ class BaseRegistry(object): self.cleanup() return self.connection.zcard(self.key) - def add(self, job, ttl, pipeline=None): + def add(self, job, ttl=0, pipeline=None): """Adds a job to a registry with expiry time of now + ttl.""" score = ttl if ttl < 0 else current_timestamp() + ttl if pipeline is not None: @@ -108,3 +108,19 @@ class FinishedJobRegistry(BaseRegistry): """ score = timestamp if timestamp is not None else current_timestamp() self.connection.zremrangebyscore(self.key, 0, score) + + +class DeferredJobRegistry(BaseRegistry): + """ + Registry of deferred jobs (waiting for another job to finish). + """ + + def __init__(self, name='default', connection=None): + super(DeferredJobRegistry, self).__init__(name, connection) + self.key = 'rq:deferred:%s' % name + + def cleanup(self): + """This method is only here to prevent errors because this method is + automatically called by `count()` and `get_job_ids()` methods + implemented in BaseRegistry.""" + pass diff --git a/tests/test_job.py b/tests/test_job.py index 34859a7..b7854b3 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -7,6 +7,7 @@ from datetime import datetime from rq.compat import as_text, PY2 from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import get_current_job, Job +from rq.registry import DeferredJobRegistry from rq.queue import Queue from rq.utils import utcformat @@ -331,12 +332,18 @@ class TestJob(RQTestCase): self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) def test_register_dependency(self): - """Test that jobs updates the correct job dependents.""" - job = Job.create(func=say_hello) + """Ensure dependency registration works properly.""" + origin = 'some_queue' + registry = DeferredJobRegistry(origin, self.testconn) + + job = Job.create(func=say_hello, origin=origin) job._dependency_id = 'id' job.save() + + self.assertEqual(registry.get_job_ids(), []) job.register_dependency() self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id) + self.assertEqual(registry.get_job_ids(), [job.id]) def test_cancel(self): """job.cancel() deletes itself & dependents mapping from Redis.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index f5edcbf..369b3f1 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -5,6 +5,7 @@ from __future__ import (absolute_import, division, print_function, from rq import get_failed_queue, Queue from rq.exceptions import InvalidJobOperationError from rq.job import Job, JobStatus +from rq.registry import DeferredJobRegistry from rq.worker import Worker from tests import RQTestCase @@ -319,23 +320,28 @@ class TestQueue(RQTestCase): self.assertEquals(len(Queue.all()), 3) def test_enqueue_dependents(self): - """Enqueueing the dependent jobs pushes all jobs in the depends set to the queue.""" + """Enqueueing dependent jobs pushes all jobs in the depends set to the queue + and removes them from DeferredJobQueue.""" q = Queue() parent_job = Job.create(func=say_hello) parent_job.save() - job_1 = Job.create(func=say_hello, depends_on=parent_job) - job_1.save() - job_1.register_dependency() - job_2 = Job.create(func=say_hello, depends_on=parent_job) - job_2.save() - job_2.register_dependency() + job_1 = q.enqueue(say_hello, depends_on=parent_job) + job_2 = q.enqueue(say_hello, depends_on=parent_job) + registry = DeferredJobRegistry(q.name, connection=self.testconn) + self.assertEqual( + set(registry.get_job_ids()), + set([job_1.id, job_2.id]) + ) # After dependents is enqueued, job_1 and job_2 should be in queue self.assertEqual(q.job_ids, []) q.enqueue_dependents(parent_job) - self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id])) + self.assertEqual(set(q.job_ids), set([job_2.id, job_1.id])) self.assertFalse(self.testconn.exists(parent_job.dependents_key)) + # DeferredJobRegistry should also be empty + self.assertEqual(registry.get_job_ids(), []) + def test_enqueue_job_with_dependency(self): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued diff --git a/tests/test_registry.py b/tests/test_registry.py index 26470e3..3798a8f 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -5,7 +5,8 @@ from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import FinishedJobRegistry, StartedJobRegistry +from rq.registry import (DeferredJobRegistry, FinishedJobRegistry, + StartedJobRegistry) from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -135,3 +136,19 @@ class TestFinishedJobRegistry(RQTestCase): failed_job = queue.enqueue(div_by_zero) worker.perform_job(failed_job) self.assertEqual(self.registry.get_job_ids(), [job.id]) + + +class TestRegistry(RQTestCase): + + def setUp(self): + super(TestRegistry, self).setUp() + self.registry = DeferredJobRegistry(connection=self.testconn) + + def test_add(self): + """Adding a job to DeferredJobsRegistry.""" + job = Job() + self.registry.add(job) + self.assertEqual( + self.testconn.zrange(self.registry.key, 0, -1), + [job.id] + ) \ No newline at end of file