|
|
@ -161,14 +161,18 @@ class Queue(object):
|
|
|
|
if self.job_class.exists(job_id, self.connection):
|
|
|
|
if self.job_class.exists(job_id, self.connection):
|
|
|
|
self.connection.rpush(self.key, job_id)
|
|
|
|
self.connection.rpush(self.key, job_id)
|
|
|
|
|
|
|
|
|
|
|
|
def push_job_id(self, job_id, pipeline=None):
|
|
|
|
def push_job_id(self, job_id, pipeline=None, skip_queue=False):
|
|
|
|
"""Pushes a job ID on the corresponding Redis queue."""
|
|
|
|
"""Pushes a job ID on the corresponding Redis queue.
|
|
|
|
|
|
|
|
'skip_queue' allows you to push the job onto the front instead of the back of the queue"""
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
connection = pipeline if pipeline is not None else self.connection
|
|
|
|
connection.rpush(self.key, job_id)
|
|
|
|
if skip_queue:
|
|
|
|
|
|
|
|
connection.lpush(self.key, job_id)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
connection.rpush(self.key, job_id)
|
|
|
|
|
|
|
|
|
|
|
|
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
|
|
|
|
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
|
|
|
|
result_ttl=None, description=None, depends_on=None,
|
|
|
|
result_ttl=None, description=None, depends_on=None,
|
|
|
|
job_id=None):
|
|
|
|
job_id=None, skip_queue=False):
|
|
|
|
"""Creates a job to represent the delayed function call and enqueues
|
|
|
|
"""Creates a job to represent the delayed function call and enqueues
|
|
|
|
it.
|
|
|
|
it.
|
|
|
|
|
|
|
|
|
|
|
@ -204,7 +208,7 @@ class Queue(object):
|
|
|
|
except WatchError:
|
|
|
|
except WatchError:
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
return self.enqueue_job(job)
|
|
|
|
return self.enqueue_job(job, skip_queue=skip_queue)
|
|
|
|
|
|
|
|
|
|
|
|
def enqueue(self, f, *args, **kwargs):
|
|
|
|
def enqueue(self, f, *args, **kwargs):
|
|
|
|
"""Creates a job to represent the delayed function call and enqueues
|
|
|
|
"""Creates a job to represent the delayed function call and enqueues
|
|
|
@ -231,6 +235,7 @@ class Queue(object):
|
|
|
|
result_ttl = kwargs.pop('result_ttl', None)
|
|
|
|
result_ttl = kwargs.pop('result_ttl', None)
|
|
|
|
depends_on = kwargs.pop('depends_on', None)
|
|
|
|
depends_on = kwargs.pop('depends_on', None)
|
|
|
|
job_id = kwargs.pop('job_id', None)
|
|
|
|
job_id = kwargs.pop('job_id', None)
|
|
|
|
|
|
|
|
skip_queue = kwargs.pop('skip_queue', False)
|
|
|
|
|
|
|
|
|
|
|
|
if 'args' in kwargs or 'kwargs' in kwargs:
|
|
|
|
if 'args' in kwargs or 'kwargs' in kwargs:
|
|
|
|
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
|
|
|
|
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
|
|
|
@ -240,9 +245,9 @@ class Queue(object):
|
|
|
|
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
|
|
|
|
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
|
|
|
|
timeout=timeout, result_ttl=result_ttl,
|
|
|
|
timeout=timeout, result_ttl=result_ttl,
|
|
|
|
description=description, depends_on=depends_on,
|
|
|
|
description=description, depends_on=depends_on,
|
|
|
|
job_id=job_id)
|
|
|
|
job_id=job_id, skip_queue=skip_queue)
|
|
|
|
|
|
|
|
|
|
|
|
def enqueue_job(self, job, set_meta_data=True):
|
|
|
|
def enqueue_job(self, job, set_meta_data=True, skip_queue=False):
|
|
|
|
"""Enqueues a job for delayed execution.
|
|
|
|
"""Enqueues a job for delayed execution.
|
|
|
|
|
|
|
|
|
|
|
|
If the `set_meta_data` argument is `True` (default), it will update
|
|
|
|
If the `set_meta_data` argument is `True` (default), it will update
|
|
|
@ -262,7 +267,7 @@ class Queue(object):
|
|
|
|
job.save()
|
|
|
|
job.save()
|
|
|
|
|
|
|
|
|
|
|
|
if self._async:
|
|
|
|
if self._async:
|
|
|
|
self.push_job_id(job.id)
|
|
|
|
self.push_job_id(job.id, skip_queue=skip_queue)
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
job.perform()
|
|
|
|
job.perform()
|
|
|
|
job.save()
|
|
|
|
job.save()
|
|
|
|