From b4f157516e1a0fb9ac54a5d496374fb067b6bb52 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Tue, 21 Jul 2015 12:52:15 +0200 Subject: [PATCH 01/11] enqueue_dependents now uses origin queue to enqueue Current implementation is using the first job queue to insert the dependent jobs. This makes RQ to ignore the original queue where the jobs were enqueued. With this change, we will use job origin attribute to fetch the original queue, then insert the dependent job to the fetched queue. fixes issue #512 --- rq/queue.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 38aa05f..2120fe5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -289,15 +289,22 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.name, self.connection) + while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) - registry.remove(dependent) - self.enqueue_job(dependent) + registry = DeferredJobRegistry(dependent.origin, self.connection) + with self.connection._pipeline() as pipeline: + registry.remove(dependent, pipeline=pipeline) + if dependent.origin == self.name: + self.enqueue_job(dependent) + else: + queue = Queue(name=dependent.origin, connection=self.connection) + queue.enqueue_job(dependent) + pipeline.execute() def pop_job_id(self): """Pops a given job ID from this Redis queue.""" From c136209804bccc26531b76212b3f027bf6c51e4e Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Tue, 21 Jul 2015 13:44:18 +0200 Subject: [PATCH 02/11] Add test_enqueue_dependents_on_mulitple_queues test --- tests/test_queue.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/test_queue.py b/tests/test_queue.py index bebdac7..01ca581 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -350,6 +350,44 @@ class TestQueue(RQTestCase): # DeferredJobRegistry should also be empty self.assertEqual(registry.get_job_ids(), []) + def test_enqueue_dependents_on_multiple_queues(self): + """Enqueueing dependent jobs pushes all jobs in the depends set to the queue + and removes them from DeferredJobRegistry for each different queue.""" + q_1 = Queue("queue_1") + q_2 = Queue("queue_2") + parent_job = Job.create(func=say_hello) + parent_job.save() + job_1 = q_1.enqueue(say_hello, depends_on=parent_job) + job_2 = q_2.enqueue(say_hello, depends_on=parent_job) + + # Each queue has its own DeferredJobRegistry + registry_1 = DeferredJobRegistry(q_1.name, connection=self.testconn) + self.assertEqual( + set(registry_1.get_job_ids()), + set([job_1.id]) + ) + registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) + self.assertEqual( + set(registry_2.get_job_ids()), + set([job_2.id]) + ) + + # After dependents is enqueued, job_1 on queue_1 and + # job_2 should be in queue_2 + self.assertEqual(q_1.job_ids, []) + self.assertEqual(q_2.job_ids, []) + q_1.enqueue_dependents(parent_job) + q_2.enqueue_dependents(parent_job) + self.assertEqual(set(q_1.job_ids), set([job_1.id])) + self.assertEqual(set(q_2.job_ids), set([job_2.id])) + self.assertFalse(self.testconn.exists(parent_job.dependents_key)) + + # DeferredJobRegistry should also be empty + self.assertEqual(registry_1.get_job_ids(), []) + self.assertEqual(registry_2.get_job_ids(), []) + + + def test_enqueue_job_with_dependency(self): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued From c76c59133bfb4ff82d398b85f4f8e796343efaf0 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Sun, 9 Aug 2015 17:45:21 +0200 Subject: [PATCH 03/11] Fix ValueError exception on relative import test_job raised an exception: ValueError: Attempted relative import in non-package --- tests/test_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_job.py b/tests/test_job.py index 6971d7b..1c92d84 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime import time -from tests import RQTestCase +from tests import RQTestCase, fixtures from tests.helpers import strip_microseconds from rq.compat import PY2, as_text @@ -16,7 +16,7 @@ from rq.registry import DeferredJobRegistry from rq.utils import utcformat from rq.worker import Worker -from . import fixtures + try: from cPickle import loads, dumps From 8a6bc08e4779b6088c4bbe5d3e2182d368615776 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Mon, 24 Aug 2015 17:13:33 +0200 Subject: [PATCH 04/11] Added pipeline named argument to enqueue_job --- rq/queue.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 2120fe5..e286dec 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -260,24 +260,26 @@ class Queue(object): description=description, depends_on=depends_on, job_id=job_id, at_front=at_front) - def enqueue_job(self, job, at_front=False): + def enqueue_job(self, job, pipeline=None, at_front=False): """Enqueues a job for delayed execution. If Queue is instantiated with async=False, job is executed immediately. """ - with self.connection._pipeline() as pipeline: - # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) - job.set_status(JobStatus.QUEUED, pipeline=pipeline) + if not pipeline: + pipeline = self.connection._pipeline() - job.origin = self.name - job.enqueued_at = utcnow() + # Add Queue key set + self.connection.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipeline) - if job.timeout is None: - job.timeout = self.DEFAULT_TIMEOUT - job.save(pipeline=pipeline) + job.origin = self.name + job.enqueued_at = utcnow() - pipeline.execute() + if job.timeout is None: + job.timeout = self.DEFAULT_TIMEOUT + job.save(pipeline=pipeline) + + pipeline.execute() if self._async: self.push_job_id(job.id, at_front=at_front) @@ -300,11 +302,10 @@ class Queue(object): with self.connection._pipeline() as pipeline: registry.remove(dependent, pipeline=pipeline) if dependent.origin == self.name: - self.enqueue_job(dependent) + self.enqueue_job(dependent, pipeline=pipeline) else: queue = Queue(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent) - pipeline.execute() + queue.enqueue_job(dependent, pipeline=pipeline) def pop_job_id(self): """Pops a given job ID from this Redis queue.""" From a2d0e4f93318217dd5ab185338ff4289b9447059 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Mon, 24 Aug 2015 17:14:28 +0200 Subject: [PATCH 05/11] Clarify test_enqueue_dependents_on_multiple_queues --- tests/test_queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 01ca581..5d87ddf 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -350,8 +350,9 @@ class TestQueue(RQTestCase): # DeferredJobRegistry should also be empty self.assertEqual(registry.get_job_ids(), []) + def test_enqueue_dependents_on_multiple_queues(self): - """Enqueueing dependent jobs pushes all jobs in the depends set to the queue + """Enqueueing dependent jobs on multiple queues pushes jobs in the queues and removes them from DeferredJobRegistry for each different queue.""" q_1 = Queue("queue_1") q_2 = Queue("queue_2") From 4fb59a4ceb55cdfb0c283e4a68d672e7a6f22af4 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Mon, 24 Aug 2015 17:46:16 +0200 Subject: [PATCH 06/11] Ensure custom exception handlers calls when Redis is down --- rq/worker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 8176b27..e0c10f3 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -590,7 +590,12 @@ class Worker(object): except Exception: job.set_status(JobStatus.FAILED, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline) - pipeline.execute() + try: + pipeline.execute() + except Exception: + # Ensure that custom exception handlers are called + # even if Redis is down + pass self.handle_exception(job, *sys.exc_info()) return False From 319f98a36e2b69aa4dff7efec295b0940ab9134c Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 25 Aug 2015 09:08:36 +0200 Subject: [PATCH 07/11] Update changelog for 0.5.5 --- CHANGES.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 586e104..07ba340 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,13 @@ +### 0.5.5 + +(August 25th, 2015) + +- Add support for `--exception-handler` command line flag +- Fix compatibility with click>=5.0 +- Fix maximum recursion depth problem for very large queues that contain jobs + that all fail + + ### 0.5.4 (July 8th, 2015) From 3f860f985d12b83d28b8628763f53e2673aab9c3 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 25 Aug 2015 09:08:42 +0200 Subject: [PATCH 08/11] Add .mailmap --- .mailmap | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .mailmap diff --git a/.mailmap b/.mailmap new file mode 100644 index 0000000..ee139e8 --- /dev/null +++ b/.mailmap @@ -0,0 +1,6 @@ +Cal Leeming +Mark LaPerriere +Selwin Ong +Vincent Driessen +Vincent Driessen +zhangliyong From c1a4780d10546f071991934c071899de8a4437a5 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 25 Aug 2015 09:12:34 +0200 Subject: [PATCH 09/11] Fix PEP8 complaints --- rq/worker.py | 6 +++--- tests/test_cli.py | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 8176b27..892f871 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -158,9 +158,9 @@ class Worker(object): if exc_handler is not None: self.push_exc_handler(exc_handler) warnings.warn( - "use of exc_handler is deprecated, pass a list to exception_handlers instead.", - DeprecationWarning - ) + "use of exc_handler is deprecated, pass a list to exception_handlers instead.", + DeprecationWarning + ) elif isinstance(exception_handlers, list): for h in exception_handlers: self.push_exc_handler(h) diff --git a/tests/test_cli.py b/tests/test_cli.py index 1a812ae..7c806b9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -120,4 +120,3 @@ class TestRQCli(RQTestCase): self.assertEqual(result.exit_code, 1) self.assertIn("Duration must be an integer greater than 1", result.output) - From 02c6df6a45b4afb025bc635eb289424b60ddf1ec Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 25 Aug 2015 09:15:15 +0200 Subject: [PATCH 10/11] This is 0.5.5 --- rq/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/version.py b/rq/version.py index 8754209..d1ec6a4 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.5.4' +VERSION = '0.5.5' From d3d9a206808c7d5f8edc783a24ba064eb7147587 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Thu, 27 Aug 2015 11:37:34 +0200 Subject: [PATCH 11/11] Do not call pipeline.execute() if pipeline is given as argument --- rq/queue.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index e286dec..8d01e75 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -265,21 +265,21 @@ class Queue(object): If Queue is instantiated with async=False, job is executed immediately. """ - if not pipeline: - pipeline = self.connection._pipeline() + pipe = pipeline if pipeline is not None else self.connection._pipeline() # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) - job.set_status(JobStatus.QUEUED, pipeline=pipeline) + pipe.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipe) job.origin = self.name job.enqueued_at = utcnow() if job.timeout is None: job.timeout = self.DEFAULT_TIMEOUT - job.save(pipeline=pipeline) + job.save(pipeline=pipe) - pipeline.execute() + if pipeline is None: + pipe.execute() if self._async: self.push_job_id(job.id, at_front=at_front) @@ -291,8 +291,6 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry - - while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: @@ -306,6 +304,7 @@ class Queue(object): else: queue = Queue(name=dependent.origin, connection=self.connection) queue.enqueue_job(dependent, pipeline=pipeline) + pipeline.execute() def pop_job_id(self): """Pops a given job ID from this Redis queue."""