enqueue_dependents now uses origin queue to enqueue

Current implementation is using the first job queue
to insert the dependent jobs. This makes RQ to ignore
the original queue where the jobs were enqueued.

With this change, we will use job origin attribute to
fetch the original queue, then insert the dependent
job to the fetched queue.

fixes issue #512
main
Javier Lopez 10 years ago
parent e6a15c57b3
commit b4f157516e

@ -289,15 +289,22 @@ class Queue(object):
# TODO: can probably be pipelined # TODO: can probably be pipelined
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.name, self.connection)
while True: while True:
job_id = as_text(self.connection.spop(job.dependents_key)) job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None: if job_id is None:
break break
dependent = self.job_class.fetch(job_id, connection=self.connection) dependent = self.job_class.fetch(job_id, connection=self.connection)
registry.remove(dependent) registry = DeferredJobRegistry(dependent.origin, self.connection)
with self.connection._pipeline() as pipeline:
registry.remove(dependent, pipeline=pipeline)
if dependent.origin == self.name:
self.enqueue_job(dependent) self.enqueue_job(dependent)
else:
queue = Queue(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent)
pipeline.execute()
def pop_job_id(self): def pop_job_id(self):
"""Pops a given job ID from this Redis queue.""" """Pops a given job ID from this Redis queue."""

Loading…
Cancel
Save