From b4f157516e1a0fb9ac54a5d496374fb067b6bb52 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Tue, 21 Jul 2015 12:52:15 +0200 Subject: [PATCH] enqueue_dependents now uses origin queue to enqueue Current implementation is using the first job queue to insert the dependent jobs. This makes RQ to ignore the original queue where the jobs were enqueued. With this change, we will use job origin attribute to fetch the original queue, then insert the dependent job to the fetched queue. fixes issue #512 --- rq/queue.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 38aa05f..2120fe5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -289,15 +289,22 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.name, self.connection) + while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) - registry.remove(dependent) - self.enqueue_job(dependent) + registry = DeferredJobRegistry(dependent.origin, self.connection) + with self.connection._pipeline() as pipeline: + registry.remove(dependent, pipeline=pipeline) + if dependent.origin == self.name: + self.enqueue_job(dependent) + else: + queue = Queue(name=dependent.origin, connection=self.connection) + queue.enqueue_job(dependent) + pipeline.execute() def pop_job_id(self): """Pops a given job ID from this Redis queue."""