diff --git a/rq/queue.py b/rq/queue.py index 2120fe5..e286dec 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -260,24 +260,26 @@ class Queue(object): description=description, depends_on=depends_on, job_id=job_id, at_front=at_front) - def enqueue_job(self, job, at_front=False): + def enqueue_job(self, job, pipeline=None, at_front=False): """Enqueues a job for delayed execution. If Queue is instantiated with async=False, job is executed immediately. """ - with self.connection._pipeline() as pipeline: - # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) - job.set_status(JobStatus.QUEUED, pipeline=pipeline) + if not pipeline: + pipeline = self.connection._pipeline() - job.origin = self.name - job.enqueued_at = utcnow() + # Add Queue key set + self.connection.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipeline) - if job.timeout is None: - job.timeout = self.DEFAULT_TIMEOUT - job.save(pipeline=pipeline) + job.origin = self.name + job.enqueued_at = utcnow() - pipeline.execute() + if job.timeout is None: + job.timeout = self.DEFAULT_TIMEOUT + job.save(pipeline=pipeline) + + pipeline.execute() if self._async: self.push_job_id(job.id, at_front=at_front) @@ -300,11 +302,10 @@ class Queue(object): with self.connection._pipeline() as pipeline: registry.remove(dependent, pipeline=pipeline) if dependent.origin == self.name: - self.enqueue_job(dependent) + self.enqueue_job(dependent, pipeline=pipeline) else: queue = Queue(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent) - pipeline.execute() + queue.enqueue_job(dependent, pipeline=pipeline) def pop_job_id(self): """Pops a given job ID from this Redis queue."""