From d4b72d330d05d2a988de19056e3ec2d5794c1344 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 9 Dec 2014 12:04:57 -0500 Subject: [PATCH 1/5] test for skip_queue mechanics --- tests/test_queue.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/test_queue.py b/tests/test_queue.py index ed42204..8bfde6d 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -459,3 +459,13 @@ class TestFailedQueue(RQTestCase): """Ensure custom job class assignment works as expected.""" q = Queue(job_class=CustomJob) self.assertEqual(q.job_class, CustomJob) + + def test_skip_queue(self): + """Ensure the skip_queue option functions""" + q = Queue('foo') + job1 = q.enqueue(say_hello) + job2 = q.enqueue(say_hello) + assert q.dequeue() == job1 + skip_job = q.enqueue(say_hello, skip_queue=True) + assert q.dequeue() == skip_job + assert q.dequeue() == job2 From f60e4884df50db2f65e5a35bd602c4074981cbab Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 9 Dec 2014 12:05:12 -0500 Subject: [PATCH 2/5] a comment typo --- rq/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 7dfb0f8..a9049dd 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -149,7 +149,7 @@ class Queue(object): def compact(self): """Removes all "dead" jobs from the queue by cycling through it, while - guarantueeing FIFO semantics. + guaranteeing FIFO semantics. """ COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4()) From ac61f502a1ddb83aa56c761a9ef4adddcd222ee6 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Tue, 9 Dec 2014 12:05:19 -0500 Subject: [PATCH 3/5] skip_queue functionality --- rq/queue.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index a9049dd..ddb31e6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -161,14 +161,18 @@ class Queue(object): if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id, pipeline=None): - """Pushes a job ID on the corresponding Redis queue.""" + def push_job_id(self, job_id, pipeline=None, skip_queue=False): + """Pushes a job ID on the corresponding Redis queue. + 'skip_queue' allows you to push the job onto the front instead of the back of the queue""" connection = pipeline if pipeline is not None else self.connection - connection.rpush(self.key, job_id) + if skip_queue: + connection.lpush(self.key, job_id) + else: + connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None, - job_id=None): + job_id=None, skip_queue=False): """Creates a job to represent the delayed function call and enqueues it. @@ -204,7 +208,7 @@ class Queue(object): except WatchError: continue - return self.enqueue_job(job) + return self.enqueue_job(job, skip_queue=skip_queue) def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues @@ -231,6 +235,7 @@ class Queue(object): result_ttl = kwargs.pop('result_ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) + skip_queue = kwargs.pop('skip_queue', False) if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa @@ -240,9 +245,9 @@ class Queue(object): return self.enqueue_call(func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, description=description, depends_on=depends_on, - job_id=job_id) + job_id=job_id, skip_queue=skip_queue) - def enqueue_job(self, job, set_meta_data=True): + def enqueue_job(self, job, set_meta_data=True, skip_queue=False): """Enqueues a job for delayed execution. If the `set_meta_data` argument is `True` (default), it will update @@ -262,7 +267,7 @@ class Queue(object): job.save() if self._async: - self.push_job_id(job.id) + self.push_job_id(job.id, skip_queue=skip_queue) else: job.perform() job.save() From db75958ad23d1ae16a028210a04bcd03e3a540b2 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Fri, 12 Dec 2014 12:25:33 -0500 Subject: [PATCH 4/5] use 'at_front' instead of 'skip_queue' --- rq/queue.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index ddb31e6..847319f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -161,18 +161,18 @@ class Queue(object): if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id, pipeline=None, skip_queue=False): + def push_job_id(self, job_id, pipeline=None, at_front=False): """Pushes a job ID on the corresponding Redis queue. - 'skip_queue' allows you to push the job onto the front instead of the back of the queue""" + 'at_front' allows you to push the job onto the front instead of the back of the queue""" connection = pipeline if pipeline is not None else self.connection - if skip_queue: + if at_front: connection.lpush(self.key, job_id) else: connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None, - job_id=None, skip_queue=False): + job_id=None, at_front=False): """Creates a job to represent the delayed function call and enqueues it. @@ -208,7 +208,7 @@ class Queue(object): except WatchError: continue - return self.enqueue_job(job, skip_queue=skip_queue) + return self.enqueue_job(job, at_front=at_front) def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues @@ -235,7 +235,7 @@ class Queue(object): result_ttl = kwargs.pop('result_ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) - skip_queue = kwargs.pop('skip_queue', False) + at_front = kwargs.pop('at_front', False) if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa @@ -245,9 +245,9 @@ class Queue(object): return self.enqueue_call(func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, description=description, depends_on=depends_on, - job_id=job_id, skip_queue=skip_queue) + job_id=job_id, at_front=at_front) - def enqueue_job(self, job, set_meta_data=True, skip_queue=False): + def enqueue_job(self, job, set_meta_data=True, at_front=False): """Enqueues a job for delayed execution. If the `set_meta_data` argument is `True` (default), it will update @@ -267,7 +267,7 @@ class Queue(object): job.save() if self._async: - self.push_job_id(job.id, skip_queue=skip_queue) + self.push_job_id(job.id, at_front=at_front) else: job.perform() job.save() From 2667088b686ff55972a146efc05374f9f846b363 Mon Sep 17 00:00:00 2001 From: Travis Johnson Date: Sun, 14 Dec 2014 23:02:47 -0500 Subject: [PATCH 5/5] changed the kwarg, but forgot to change the test --- tests/test_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 8bfde6d..85bbc00 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -466,6 +466,6 @@ class TestFailedQueue(RQTestCase): job1 = q.enqueue(say_hello) job2 = q.enqueue(say_hello) assert q.dequeue() == job1 - skip_job = q.enqueue(say_hello, skip_queue=True) + skip_job = q.enqueue(say_hello, at_front=True) assert q.dequeue() == skip_job assert q.dequeue() == job2