diff --git a/.mailmap b/.mailmap new file mode 100644 index 0000000..ee139e8 --- /dev/null +++ b/.mailmap @@ -0,0 +1,6 @@ +Cal Leeming +Mark LaPerriere +Selwin Ong +Vincent Driessen +Vincent Driessen +zhangliyong diff --git a/CHANGES.md b/CHANGES.md index 586e104..07ba340 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,13 @@ +### 0.5.5 + +(August 25th, 2015) + +- Add support for `--exception-handler` command line flag +- Fix compatibility with click>=5.0 +- Fix maximum recursion depth problem for very large queues that contain jobs + that all fail + + ### 0.5.4 (July 8th, 2015) diff --git a/rq/queue.py b/rq/queue.py index 38aa05f..8d01e75 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -260,24 +260,26 @@ class Queue(object): description=description, depends_on=depends_on, job_id=job_id, at_front=at_front) - def enqueue_job(self, job, at_front=False): + def enqueue_job(self, job, pipeline=None, at_front=False): """Enqueues a job for delayed execution. If Queue is instantiated with async=False, job is executed immediately. """ - with self.connection._pipeline() as pipeline: - # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) - job.set_status(JobStatus.QUEUED, pipeline=pipeline) + pipe = pipeline if pipeline is not None else self.connection._pipeline() - job.origin = self.name - job.enqueued_at = utcnow() + # Add Queue key set + pipe.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipe) - if job.timeout is None: - job.timeout = self.DEFAULT_TIMEOUT - job.save(pipeline=pipeline) + job.origin = self.name + job.enqueued_at = utcnow() - pipeline.execute() + if job.timeout is None: + job.timeout = self.DEFAULT_TIMEOUT + job.save(pipeline=pipe) + + if pipeline is None: + pipe.execute() if self._async: self.push_job_id(job.id, at_front=at_front) @@ -289,15 +291,20 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.name, self.connection) - while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) - registry.remove(dependent) - self.enqueue_job(dependent) + registry = DeferredJobRegistry(dependent.origin, self.connection) + with self.connection._pipeline() as pipeline: + registry.remove(dependent, pipeline=pipeline) + if dependent.origin == self.name: + self.enqueue_job(dependent, pipeline=pipeline) + else: + queue = Queue(name=dependent.origin, connection=self.connection) + queue.enqueue_job(dependent, pipeline=pipeline) + pipeline.execute() def pop_job_id(self): """Pops a given job ID from this Redis queue.""" diff --git a/rq/version.py b/rq/version.py index 8754209..d1ec6a4 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.5.4' +VERSION = '0.5.5' diff --git a/rq/worker.py b/rq/worker.py index 8176b27..c1c5d8e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -158,9 +158,9 @@ class Worker(object): if exc_handler is not None: self.push_exc_handler(exc_handler) warnings.warn( - "use of exc_handler is deprecated, pass a list to exception_handlers instead.", - DeprecationWarning - ) + "use of exc_handler is deprecated, pass a list to exception_handlers instead.", + DeprecationWarning + ) elif isinstance(exception_handlers, list): for h in exception_handlers: self.push_exc_handler(h) @@ -590,7 +590,12 @@ class Worker(object): except Exception: job.set_status(JobStatus.FAILED, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline) - pipeline.execute() + try: + pipeline.execute() + except Exception: + # Ensure that custom exception handlers are called + # even if Redis is down + pass self.handle_exception(job, *sys.exc_info()) return False diff --git a/tests/test_cli.py b/tests/test_cli.py index 1a812ae..7c806b9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -120,4 +120,3 @@ class TestRQCli(RQTestCase): self.assertEqual(result.exit_code, 1) self.assertIn("Duration must be an integer greater than 1", result.output) - diff --git a/tests/test_queue.py b/tests/test_queue.py index bebdac7..5d87ddf 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -350,6 +350,45 @@ class TestQueue(RQTestCase): # DeferredJobRegistry should also be empty self.assertEqual(registry.get_job_ids(), []) + + def test_enqueue_dependents_on_multiple_queues(self): + """Enqueueing dependent jobs on multiple queues pushes jobs in the queues + and removes them from DeferredJobRegistry for each different queue.""" + q_1 = Queue("queue_1") + q_2 = Queue("queue_2") + parent_job = Job.create(func=say_hello) + parent_job.save() + job_1 = q_1.enqueue(say_hello, depends_on=parent_job) + job_2 = q_2.enqueue(say_hello, depends_on=parent_job) + + # Each queue has its own DeferredJobRegistry + registry_1 = DeferredJobRegistry(q_1.name, connection=self.testconn) + self.assertEqual( + set(registry_1.get_job_ids()), + set([job_1.id]) + ) + registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) + self.assertEqual( + set(registry_2.get_job_ids()), + set([job_2.id]) + ) + + # After dependents is enqueued, job_1 on queue_1 and + # job_2 should be in queue_2 + self.assertEqual(q_1.job_ids, []) + self.assertEqual(q_2.job_ids, []) + q_1.enqueue_dependents(parent_job) + q_2.enqueue_dependents(parent_job) + self.assertEqual(set(q_1.job_ids), set([job_1.id])) + self.assertEqual(set(q_2.job_ids), set([job_2.id])) + self.assertFalse(self.testconn.exists(parent_job.dependents_key)) + + # DeferredJobRegistry should also be empty + self.assertEqual(registry_1.get_job_ids(), []) + self.assertEqual(registry_2.get_job_ids(), []) + + + def test_enqueue_job_with_dependency(self): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued