Added pipeline named argument to enqueue_job

main
Javier Lopez 9 years ago
parent c76c59133b
commit 8a6bc08e47

@ -260,24 +260,26 @@ class Queue(object):
description=description, depends_on=depends_on, description=description, depends_on=depends_on,
job_id=job_id, at_front=at_front) job_id=job_id, at_front=at_front)
def enqueue_job(self, job, at_front=False): def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
If Queue is instantiated with async=False, job is executed immediately. If Queue is instantiated with async=False, job is executed immediately.
""" """
with self.connection._pipeline() as pipeline: if not pipeline:
# Add Queue key set pipeline = self.connection._pipeline()
self.connection.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipeline)
job.origin = self.name # Add Queue key set
job.enqueued_at = utcnow() self.connection.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipeline)
if job.timeout is None: job.origin = self.name
job.timeout = self.DEFAULT_TIMEOUT job.enqueued_at = utcnow()
job.save(pipeline=pipeline)
pipeline.execute() if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT
job.save(pipeline=pipeline)
pipeline.execute()
if self._async: if self._async:
self.push_job_id(job.id, at_front=at_front) self.push_job_id(job.id, at_front=at_front)
@ -300,11 +302,10 @@ class Queue(object):
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
registry.remove(dependent, pipeline=pipeline) registry.remove(dependent, pipeline=pipeline)
if dependent.origin == self.name: if dependent.origin == self.name:
self.enqueue_job(dependent) self.enqueue_job(dependent, pipeline=pipeline)
else: else:
queue = Queue(name=dependent.origin, connection=self.connection) queue = Queue(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent) queue.enqueue_job(dependent, pipeline=pipeline)
pipeline.execute()
def pop_job_id(self): def pop_job_id(self):
"""Pops a given job ID from this Redis queue.""" """Pops a given job ID from this Redis queue."""

Loading…
Cancel
Save