|
|
|
@ -4,15 +4,14 @@ from __future__ import (absolute_import, division, print_function,
|
|
|
|
|
|
|
|
|
|
import uuid
|
|
|
|
|
|
|
|
|
|
from .connections import resolve_connection
|
|
|
|
|
from .job import Job, Status
|
|
|
|
|
from .utils import import_attribute, utcnow
|
|
|
|
|
from redis import WatchError
|
|
|
|
|
|
|
|
|
|
from .compat import as_text, string_types, total_ordering
|
|
|
|
|
from .connections import resolve_connection
|
|
|
|
|
from .exceptions import (DequeueTimeout, InvalidJobOperationError,
|
|
|
|
|
NoSuchJobError, UnpickleError)
|
|
|
|
|
from .compat import total_ordering, string_types, as_text
|
|
|
|
|
|
|
|
|
|
from redis import WatchError
|
|
|
|
|
from .job import Job, JobStatus
|
|
|
|
|
from .utils import import_attribute, utcnow
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_failed_queue(connection=None):
|
|
|
|
@ -149,7 +148,7 @@ class Queue(object):
|
|
|
|
|
|
|
|
|
|
def compact(self):
|
|
|
|
|
"""Removes all "dead" jobs from the queue by cycling through it, while
|
|
|
|
|
guarantueeing FIFO semantics.
|
|
|
|
|
guaranteeing FIFO semantics.
|
|
|
|
|
"""
|
|
|
|
|
COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4())
|
|
|
|
|
|
|
|
|
@ -161,14 +160,18 @@ class Queue(object):
|
|
|
|
|
if self.job_class.exists(job_id, self.connection):
|
|
|
|
|
self.connection.rpush(self.key, job_id)
|
|
|
|
|
|
|
|
|
|
def push_job_id(self, job_id, pipeline=None):
|
|
|
|
|
"""Pushes a job ID on the corresponding Redis queue."""
|
|
|
|
|
def push_job_id(self, job_id, pipeline=None, at_front=False):
|
|
|
|
|
"""Pushes a job ID on the corresponding Redis queue.
|
|
|
|
|
'at_front' allows you to push the job onto the front instead of the back of the queue"""
|
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
|
if at_front:
|
|
|
|
|
connection.lpush(self.key, job_id)
|
|
|
|
|
else:
|
|
|
|
|
connection.rpush(self.key, job_id)
|
|
|
|
|
|
|
|
|
|
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
|
|
|
|
|
result_ttl=None, ttl=None, description=None,
|
|
|
|
|
depends_on=None, job_id=None):
|
|
|
|
|
depends_on=None, job_id=None, at_front=False):
|
|
|
|
|
"""Creates a job to represent the delayed function call and enqueues
|
|
|
|
|
it.
|
|
|
|
|
|
|
|
|
@ -178,11 +181,11 @@ class Queue(object):
|
|
|
|
|
"""
|
|
|
|
|
timeout = timeout or self._default_timeout
|
|
|
|
|
|
|
|
|
|
# TODO: job with dependency shouldn't have "queued" as status
|
|
|
|
|
job = self.job_class.create(func, args, kwargs, connection=self.connection,
|
|
|
|
|
result_ttl=result_ttl, ttl=ttl, status=Status.QUEUED,
|
|
|
|
|
description=description, depends_on=depends_on, timeout=timeout,
|
|
|
|
|
id=job_id)
|
|
|
|
|
job = self.job_class.create(
|
|
|
|
|
func, args, kwargs, connection=self.connection,
|
|
|
|
|
result_ttl=result_ttl, status=JobStatus.QUEUED,
|
|
|
|
|
description=description, depends_on=depends_on,
|
|
|
|
|
timeout=timeout, id=job_id, origin=self.name)
|
|
|
|
|
|
|
|
|
|
# If job depends on an unfinished job, register itself on it's
|
|
|
|
|
# parent's dependents instead of enqueueing it.
|
|
|
|
@ -195,7 +198,8 @@ class Queue(object):
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
pipe.watch(depends_on.key)
|
|
|
|
|
if depends_on.get_status() != Status.FINISHED:
|
|
|
|
|
if depends_on.get_status() != JobStatus.FINISHED:
|
|
|
|
|
job.set_status(JobStatus.DEFERRED)
|
|
|
|
|
job.register_dependency(pipeline=pipe)
|
|
|
|
|
job.save(pipeline=pipe)
|
|
|
|
|
pipe.execute()
|
|
|
|
@ -204,7 +208,7 @@ class Queue(object):
|
|
|
|
|
except WatchError:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
return self.enqueue_job(job)
|
|
|
|
|
return self.enqueue_job(job, at_front=at_front)
|
|
|
|
|
|
|
|
|
|
def enqueue(self, f, *args, **kwargs):
|
|
|
|
|
"""Creates a job to represent the delayed function call and enqueues
|
|
|
|
@ -232,6 +236,7 @@ class Queue(object):
|
|
|
|
|
ttl = kwargs.pop('ttl', None)
|
|
|
|
|
depends_on = kwargs.pop('depends_on', None)
|
|
|
|
|
job_id = kwargs.pop('job_id', None)
|
|
|
|
|
at_front = kwargs.pop('at_front', False)
|
|
|
|
|
|
|
|
|
|
if 'args' in kwargs or 'kwargs' in kwargs:
|
|
|
|
|
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
|
|
|
|
@ -241,29 +246,29 @@ class Queue(object):
|
|
|
|
|
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
|
|
|
|
|
timeout=timeout, result_ttl=result_ttl, ttl=ttl,
|
|
|
|
|
description=description, depends_on=depends_on,
|
|
|
|
|
job_id=job_id)
|
|
|
|
|
job_id=job_id, at_front=at_front)
|
|
|
|
|
|
|
|
|
|
def enqueue_job(self, job, set_meta_data=True):
|
|
|
|
|
def enqueue_job(self, job, at_front=False):
|
|
|
|
|
"""Enqueues a job for delayed execution.
|
|
|
|
|
|
|
|
|
|
If the `set_meta_data` argument is `True` (default), it will update
|
|
|
|
|
the properties `origin` and `enqueued_at`.
|
|
|
|
|
|
|
|
|
|
If Queue is instantiated with async=False, job is executed immediately.
|
|
|
|
|
"""
|
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
|
# Add Queue key set
|
|
|
|
|
self.connection.sadd(self.redis_queues_keys, self.key)
|
|
|
|
|
job.set_status(JobStatus.QUEUED, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
if set_meta_data:
|
|
|
|
|
job.origin = self.name
|
|
|
|
|
job.enqueued_at = utcnow()
|
|
|
|
|
|
|
|
|
|
if job.timeout is None:
|
|
|
|
|
job.timeout = self.DEFAULT_TIMEOUT
|
|
|
|
|
job.save()
|
|
|
|
|
job.save(pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
if self._async:
|
|
|
|
|
self.push_job_id(job.id)
|
|
|
|
|
self.push_job_id(job.id, at_front=at_front)
|
|
|
|
|
else:
|
|
|
|
|
job.perform()
|
|
|
|
|
job.save()
|
|
|
|
@ -272,11 +277,16 @@ class Queue(object):
|
|
|
|
|
def enqueue_dependents(self, job):
|
|
|
|
|
"""Enqueues all jobs in the given job's dependents set and clears it."""
|
|
|
|
|
# TODO: can probably be pipelined
|
|
|
|
|
from .registry import DeferredJobRegistry
|
|
|
|
|
|
|
|
|
|
registry = DeferredJobRegistry(self.name, self.connection)
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
job_id = as_text(self.connection.spop(job.dependents_key))
|
|
|
|
|
if job_id is None:
|
|
|
|
|
break
|
|
|
|
|
dependent = self.job_class.fetch(job_id, connection=self.connection)
|
|
|
|
|
registry.remove(dependent)
|
|
|
|
|
self.enqueue_job(dependent)
|
|
|
|
|
|
|
|
|
|
def pop_job_id(self):
|
|
|
|
@ -391,19 +401,25 @@ class Queue(object):
|
|
|
|
|
|
|
|
|
|
class FailedQueue(Queue):
|
|
|
|
|
def __init__(self, connection=None):
|
|
|
|
|
super(FailedQueue, self).__init__(Status.FAILED, connection=connection)
|
|
|
|
|
super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)
|
|
|
|
|
|
|
|
|
|
def quarantine(self, job, exc_info):
|
|
|
|
|
"""Puts the given Job in quarantine (i.e. put it on the failed
|
|
|
|
|
queue).
|
|
|
|
|
|
|
|
|
|
This is different from normal job enqueueing, since certain meta data
|
|
|
|
|
must not be overridden (e.g. `origin` or `enqueued_at`) and other meta
|
|
|
|
|
data must be inserted (`ended_at` and `exc_info`).
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
|
# Add Queue key set
|
|
|
|
|
self.connection.sadd(self.redis_queues_keys, self.key)
|
|
|
|
|
|
|
|
|
|
job.ended_at = utcnow()
|
|
|
|
|
job.exc_info = exc_info
|
|
|
|
|
return self.enqueue_job(job, set_meta_data=False)
|
|
|
|
|
job.save(pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
self.push_job_id(job.id, pipeline=pipeline)
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
return job
|
|
|
|
|
|
|
|
|
|
def requeue(self, job_id):
|
|
|
|
|
"""Requeues the job with the given job ID."""
|
|
|
|
@ -418,7 +434,7 @@ class FailedQueue(Queue):
|
|
|
|
|
if self.remove(job) == 0:
|
|
|
|
|
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
|
|
|
|
|
|
|
|
|
|
job.set_status(Status.QUEUED)
|
|
|
|
|
job.set_status(JobStatus.QUEUED)
|
|
|
|
|
job.exc_info = None
|
|
|
|
|
q = Queue(job.origin, connection=self.connection)
|
|
|
|
|
q.enqueue_job(job)
|
|
|
|
|