diff --git a/rq/queue.py b/rq/queue.py index e286dec..8d01e75 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -265,21 +265,21 @@ class Queue(object): If Queue is instantiated with async=False, job is executed immediately. """ - if not pipeline: - pipeline = self.connection._pipeline() + pipe = pipeline if pipeline is not None else self.connection._pipeline() # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) - job.set_status(JobStatus.QUEUED, pipeline=pipeline) + pipe.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipe) job.origin = self.name job.enqueued_at = utcnow() if job.timeout is None: job.timeout = self.DEFAULT_TIMEOUT - job.save(pipeline=pipeline) + job.save(pipeline=pipe) - pipeline.execute() + if pipeline is None: + pipe.execute() if self._async: self.push_job_id(job.id, at_front=at_front) @@ -291,8 +291,6 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry - - while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: @@ -306,6 +304,7 @@ class Queue(object): else: queue = Queue(name=dependent.origin, connection=self.connection) queue.enqueue_job(dependent, pipeline=pipeline) + pipeline.execute() def pop_job_id(self): """Pops a given job ID from this Redis queue."""