|
|
|
@ -265,21 +265,21 @@ class Queue(object):
|
|
|
|
|
|
|
|
|
|
If Queue is instantiated with async=False, job is executed immediately.
|
|
|
|
|
"""
|
|
|
|
|
if not pipeline:
|
|
|
|
|
pipeline = self.connection._pipeline()
|
|
|
|
|
pipe = pipeline if pipeline is not None else self.connection._pipeline()
|
|
|
|
|
|
|
|
|
|
# Add Queue key set
|
|
|
|
|
self.connection.sadd(self.redis_queues_keys, self.key)
|
|
|
|
|
job.set_status(JobStatus.QUEUED, pipeline=pipeline)
|
|
|
|
|
pipe.sadd(self.redis_queues_keys, self.key)
|
|
|
|
|
job.set_status(JobStatus.QUEUED, pipeline=pipe)
|
|
|
|
|
|
|
|
|
|
job.origin = self.name
|
|
|
|
|
job.enqueued_at = utcnow()
|
|
|
|
|
|
|
|
|
|
if job.timeout is None:
|
|
|
|
|
job.timeout = self.DEFAULT_TIMEOUT
|
|
|
|
|
job.save(pipeline=pipeline)
|
|
|
|
|
job.save(pipeline=pipe)
|
|
|
|
|
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
if pipeline is None:
|
|
|
|
|
pipe.execute()
|
|
|
|
|
|
|
|
|
|
if self._async:
|
|
|
|
|
self.push_job_id(job.id, at_front=at_front)
|
|
|
|
@ -291,8 +291,6 @@ class Queue(object):
|
|
|
|
|
# TODO: can probably be pipelined
|
|
|
|
|
from .registry import DeferredJobRegistry
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
job_id = as_text(self.connection.spop(job.dependents_key))
|
|
|
|
|
if job_id is None:
|
|
|
|
@ -306,6 +304,7 @@ class Queue(object):
|
|
|
|
|
else:
|
|
|
|
|
queue = Queue(name=dependent.origin, connection=self.connection)
|
|
|
|
|
queue.enqueue_job(dependent, pipeline=pipeline)
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
def pop_job_id(self):
|
|
|
|
|
"""Pops a given job ID from this Redis queue."""
|
|
|
|
|