diff --git a/rq/queue.py b/rq/queue.py index 38aa05f..8d01e75 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -260,24 +260,26 @@ class Queue(object): description=description, depends_on=depends_on, job_id=job_id, at_front=at_front) - def enqueue_job(self, job, at_front=False): + def enqueue_job(self, job, pipeline=None, at_front=False): """Enqueues a job for delayed execution. If Queue is instantiated with async=False, job is executed immediately. """ - with self.connection._pipeline() as pipeline: - # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) - job.set_status(JobStatus.QUEUED, pipeline=pipeline) + pipe = pipeline if pipeline is not None else self.connection._pipeline() - job.origin = self.name - job.enqueued_at = utcnow() + # Add Queue key set + pipe.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipe) - if job.timeout is None: - job.timeout = self.DEFAULT_TIMEOUT - job.save(pipeline=pipeline) + job.origin = self.name + job.enqueued_at = utcnow() - pipeline.execute() + if job.timeout is None: + job.timeout = self.DEFAULT_TIMEOUT + job.save(pipeline=pipe) + + if pipeline is None: + pipe.execute() if self._async: self.push_job_id(job.id, at_front=at_front) @@ -289,15 +291,20 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.name, self.connection) - while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) - registry.remove(dependent) - self.enqueue_job(dependent) + registry = DeferredJobRegistry(dependent.origin, self.connection) + with self.connection._pipeline() as pipeline: + registry.remove(dependent, pipeline=pipeline) + if dependent.origin == self.name: + self.enqueue_job(dependent, pipeline=pipeline) + else: + queue = Queue(name=dependent.origin, connection=self.connection) + queue.enqueue_job(dependent, pipeline=pipeline) + pipeline.execute() def pop_job_id(self): """Pops a given job ID from this Redis queue.""" diff --git a/tests/test_job.py b/tests/test_job.py index 6971d7b..1c92d84 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime import time -from tests import RQTestCase +from tests import RQTestCase, fixtures from tests.helpers import strip_microseconds from rq.compat import PY2, as_text @@ -16,7 +16,7 @@ from rq.registry import DeferredJobRegistry from rq.utils import utcformat from rq.worker import Worker -from . import fixtures + try: from cPickle import loads, dumps diff --git a/tests/test_queue.py b/tests/test_queue.py index bebdac7..5d87ddf 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -350,6 +350,45 @@ class TestQueue(RQTestCase): # DeferredJobRegistry should also be empty self.assertEqual(registry.get_job_ids(), []) + + def test_enqueue_dependents_on_multiple_queues(self): + """Enqueueing dependent jobs on multiple queues pushes jobs in the queues + and removes them from DeferredJobRegistry for each different queue.""" + q_1 = Queue("queue_1") + q_2 = Queue("queue_2") + parent_job = Job.create(func=say_hello) + parent_job.save() + job_1 = q_1.enqueue(say_hello, depends_on=parent_job) + job_2 = q_2.enqueue(say_hello, depends_on=parent_job) + + # Each queue has its own DeferredJobRegistry + registry_1 = DeferredJobRegistry(q_1.name, connection=self.testconn) + self.assertEqual( + set(registry_1.get_job_ids()), + set([job_1.id]) + ) + registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) + self.assertEqual( + set(registry_2.get_job_ids()), + set([job_2.id]) + ) + + # After dependents is enqueued, job_1 on queue_1 and + # job_2 should be in queue_2 + self.assertEqual(q_1.job_ids, []) + self.assertEqual(q_2.job_ids, []) + q_1.enqueue_dependents(parent_job) + q_2.enqueue_dependents(parent_job) + self.assertEqual(set(q_1.job_ids), set([job_1.id])) + self.assertEqual(set(q_2.job_ids), set([job_2.id])) + self.assertFalse(self.testconn.exists(parent_job.dependents_key)) + + # DeferredJobRegistry should also be empty + self.assertEqual(registry_1.get_job_ids(), []) + self.assertEqual(registry_2.get_job_ids(), []) + + + def test_enqueue_job_with_dependency(self): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued