diff --git a/rq/queue.py b/rq/queue.py index 38aa05f..2120fe5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -289,15 +289,22 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.name, self.connection) + while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) - registry.remove(dependent) - self.enqueue_job(dependent) + registry = DeferredJobRegistry(dependent.origin, self.connection) + with self.connection._pipeline() as pipeline: + registry.remove(dependent, pipeline=pipeline) + if dependent.origin == self.name: + self.enqueue_job(dependent) + else: + queue = Queue(name=dependent.origin, connection=self.connection) + queue.enqueue_job(dependent) + pipeline.execute() def pop_job_id(self): """Pops a given job ID from this Redis queue."""