Simplify FailedQueue.quarantine and ensure that a deferred job's status is set to Queued when enqueued.

main
Selwin Ong 10 years ago
parent 7fd2ac8ca6
commit 9320496402

@ -248,24 +248,25 @@ class Queue(object):
description=description, depends_on=depends_on, description=description, depends_on=depends_on,
job_id=job_id, at_front=at_front) job_id=job_id, at_front=at_front)
def enqueue_job(self, job, set_meta_data=True, at_front=False): def enqueue_job(self, job, at_front=False):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`.
If Queue is instantiated with async=False, job is executed immediately. If Queue is instantiated with async=False, job is executed immediately.
""" """
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key) with self.connection._pipeline() as pipeline:
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipeline)
if set_meta_data:
job.origin = self.name job.origin = self.name
job.enqueued_at = utcnow() job.enqueued_at = utcnow()
if job.timeout is None: if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT job.timeout = self.DEFAULT_TIMEOUT
job.save() job.save(pipeline=pipeline)
pipeline.execute()
if self._async: if self._async:
self.push_job_id(job.id, at_front=at_front) self.push_job_id(job.id, at_front=at_front)
@ -401,14 +402,20 @@ class FailedQueue(Queue):
def quarantine(self, job, exc_info): def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed """Puts the given Job in quarantine (i.e. put it on the failed
queue). queue).
This is different from normal job enqueueing, since certain meta data
must not be overridden (e.g. `origin` or `enqueued_at`) and other meta
data must be inserted (`ended_at` and `exc_info`).
""" """
job.ended_at = utcnow()
job.exc_info = exc_info with self.connection._pipeline() as pipeline:
return self.enqueue_job(job, set_meta_data=False) # Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.ended_at = utcnow()
job.exc_info = exc_info
job.save(pipeline=pipeline)
self.push_job_id(job.id, pipeline=pipeline)
pipeline.execute()
return job
def requeue(self, job_id): def requeue(self, job_id):
"""Requeues the job with the given job ID.""" """Requeues the job with the given job ID."""

Loading…
Cancel
Save