Merge pull request #463 from conslo/skip-queue

Skip queue
main
Selwin Ong 10 years ago
commit 0a0a81da6d

@ -149,7 +149,7 @@ class Queue(object):
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
guarantueeing FIFO semantics.
guaranteeing FIFO semantics.
"""
COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4())
@ -161,14 +161,18 @@ class Queue(object):
if self.job_class.exists(job_id, self.connection):
self.connection.rpush(self.key, job_id)
def push_job_id(self, job_id, pipeline=None):
"""Pushes a job ID on the corresponding Redis queue."""
def push_job_id(self, job_id, pipeline=None, at_front=False):
"""Pushes a job ID on the corresponding Redis queue.
'at_front' allows you to push the job onto the front instead of the back of the queue"""
connection = pipeline if pipeline is not None else self.connection
connection.rpush(self.key, job_id)
if at_front:
connection.lpush(self.key, job_id)
else:
connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, description=None,
depends_on=None, job_id=None):
depends_on=None, job_id=None, at_front=False):
"""Creates a job to represent the delayed function call and enqueues
it.
@ -204,7 +208,7 @@ class Queue(object):
except WatchError:
continue
return self.enqueue_job(job)
return self.enqueue_job(job, at_front=at_front)
def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues
@ -232,6 +236,7 @@ class Queue(object):
ttl = kwargs.pop('ttl', None)
depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False)
if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
@ -241,9 +246,9 @@ class Queue(object):
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl,
description=description, depends_on=depends_on,
job_id=job_id)
job_id=job_id, at_front=at_front)
def enqueue_job(self, job, set_meta_data=True):
def enqueue_job(self, job, set_meta_data=True, at_front=False):
"""Enqueues a job for delayed execution.
If the `set_meta_data` argument is `True` (default), it will update
@ -263,7 +268,7 @@ class Queue(object):
job.save()
if self._async:
self.push_job_id(job.id)
self.push_job_id(job.id, at_front=at_front)
else:
job.perform()
job.save()

@ -459,3 +459,13 @@ class TestFailedQueue(RQTestCase):
"""Ensure custom job class assignment works as expected."""
q = Queue(job_class=CustomJob)
self.assertEqual(q.job_class, CustomJob)
def test_skip_queue(self):
"""Ensure the skip_queue option functions"""
q = Queue('foo')
job1 = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
assert q.dequeue() == job1
skip_job = q.enqueue(say_hello, at_front=True)
assert q.dequeue() == skip_job
assert q.dequeue() == job2

Loading…
Cancel
Save