From b4f157516e1a0fb9ac54a5d496374fb067b6bb52 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Tue, 21 Jul 2015 12:52:15 +0200 Subject: [PATCH 1/6] enqueue_dependents now uses origin queue to enqueue Current implementation is using the first job queue to insert the dependent jobs. This makes RQ to ignore the original queue where the jobs were enqueued. With this change, we will use job origin attribute to fetch the original queue, then insert the dependent job to the fetched queue. fixes issue #512 --- rq/queue.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 38aa05f..2120fe5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -289,15 +289,22 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.name, self.connection) + while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) - registry.remove(dependent) - self.enqueue_job(dependent) + registry = DeferredJobRegistry(dependent.origin, self.connection) + with self.connection._pipeline() as pipeline: + registry.remove(dependent, pipeline=pipeline) + if dependent.origin == self.name: + self.enqueue_job(dependent) + else: + queue = Queue(name=dependent.origin, connection=self.connection) + queue.enqueue_job(dependent) + pipeline.execute() def pop_job_id(self): """Pops a given job ID from this Redis queue.""" From c136209804bccc26531b76212b3f027bf6c51e4e Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Tue, 21 Jul 2015 13:44:18 +0200 Subject: [PATCH 2/6] Add test_enqueue_dependents_on_mulitple_queues test --- tests/test_queue.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/test_queue.py b/tests/test_queue.py index bebdac7..01ca581 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -350,6 +350,44 @@ class TestQueue(RQTestCase): # DeferredJobRegistry should also be empty self.assertEqual(registry.get_job_ids(), []) + def test_enqueue_dependents_on_multiple_queues(self): + """Enqueueing dependent jobs pushes all jobs in the depends set to the queue + and removes them from DeferredJobRegistry for each different queue.""" + q_1 = Queue("queue_1") + q_2 = Queue("queue_2") + parent_job = Job.create(func=say_hello) + parent_job.save() + job_1 = q_1.enqueue(say_hello, depends_on=parent_job) + job_2 = q_2.enqueue(say_hello, depends_on=parent_job) + + # Each queue has its own DeferredJobRegistry + registry_1 = DeferredJobRegistry(q_1.name, connection=self.testconn) + self.assertEqual( + set(registry_1.get_job_ids()), + set([job_1.id]) + ) + registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) + self.assertEqual( + set(registry_2.get_job_ids()), + set([job_2.id]) + ) + + # After dependents is enqueued, job_1 on queue_1 and + # job_2 should be in queue_2 + self.assertEqual(q_1.job_ids, []) + self.assertEqual(q_2.job_ids, []) + q_1.enqueue_dependents(parent_job) + q_2.enqueue_dependents(parent_job) + self.assertEqual(set(q_1.job_ids), set([job_1.id])) + self.assertEqual(set(q_2.job_ids), set([job_2.id])) + self.assertFalse(self.testconn.exists(parent_job.dependents_key)) + + # DeferredJobRegistry should also be empty + self.assertEqual(registry_1.get_job_ids(), []) + self.assertEqual(registry_2.get_job_ids(), []) + + + def test_enqueue_job_with_dependency(self): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued From c76c59133bfb4ff82d398b85f4f8e796343efaf0 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Sun, 9 Aug 2015 17:45:21 +0200 Subject: [PATCH 3/6] Fix ValueError exception on relative import test_job raised an exception: ValueError: Attempted relative import in non-package --- tests/test_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_job.py b/tests/test_job.py index 6971d7b..1c92d84 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime import time -from tests import RQTestCase +from tests import RQTestCase, fixtures from tests.helpers import strip_microseconds from rq.compat import PY2, as_text @@ -16,7 +16,7 @@ from rq.registry import DeferredJobRegistry from rq.utils import utcformat from rq.worker import Worker -from . import fixtures + try: from cPickle import loads, dumps From 8a6bc08e4779b6088c4bbe5d3e2182d368615776 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Mon, 24 Aug 2015 17:13:33 +0200 Subject: [PATCH 4/6] Added pipeline named argument to enqueue_job --- rq/queue.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 2120fe5..e286dec 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -260,24 +260,26 @@ class Queue(object): description=description, depends_on=depends_on, job_id=job_id, at_front=at_front) - def enqueue_job(self, job, at_front=False): + def enqueue_job(self, job, pipeline=None, at_front=False): """Enqueues a job for delayed execution. If Queue is instantiated with async=False, job is executed immediately. """ - with self.connection._pipeline() as pipeline: - # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) - job.set_status(JobStatus.QUEUED, pipeline=pipeline) + if not pipeline: + pipeline = self.connection._pipeline() - job.origin = self.name - job.enqueued_at = utcnow() + # Add Queue key set + self.connection.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipeline) - if job.timeout is None: - job.timeout = self.DEFAULT_TIMEOUT - job.save(pipeline=pipeline) + job.origin = self.name + job.enqueued_at = utcnow() - pipeline.execute() + if job.timeout is None: + job.timeout = self.DEFAULT_TIMEOUT + job.save(pipeline=pipeline) + + pipeline.execute() if self._async: self.push_job_id(job.id, at_front=at_front) @@ -300,11 +302,10 @@ class Queue(object): with self.connection._pipeline() as pipeline: registry.remove(dependent, pipeline=pipeline) if dependent.origin == self.name: - self.enqueue_job(dependent) + self.enqueue_job(dependent, pipeline=pipeline) else: queue = Queue(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent) - pipeline.execute() + queue.enqueue_job(dependent, pipeline=pipeline) def pop_job_id(self): """Pops a given job ID from this Redis queue.""" From a2d0e4f93318217dd5ab185338ff4289b9447059 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Mon, 24 Aug 2015 17:14:28 +0200 Subject: [PATCH 5/6] Clarify test_enqueue_dependents_on_multiple_queues --- tests/test_queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 01ca581..5d87ddf 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -350,8 +350,9 @@ class TestQueue(RQTestCase): # DeferredJobRegistry should also be empty self.assertEqual(registry.get_job_ids(), []) + def test_enqueue_dependents_on_multiple_queues(self): - """Enqueueing dependent jobs pushes all jobs in the depends set to the queue + """Enqueueing dependent jobs on multiple queues pushes jobs in the queues and removes them from DeferredJobRegistry for each different queue.""" q_1 = Queue("queue_1") q_2 = Queue("queue_2") From d3d9a206808c7d5f8edc783a24ba064eb7147587 Mon Sep 17 00:00:00 2001 From: Javier Lopez Date: Thu, 27 Aug 2015 11:37:34 +0200 Subject: [PATCH 6/6] Do not call pipeline.execute() if pipeline is given as argument --- rq/queue.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index e286dec..8d01e75 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -265,21 +265,21 @@ class Queue(object): If Queue is instantiated with async=False, job is executed immediately. """ - if not pipeline: - pipeline = self.connection._pipeline() + pipe = pipeline if pipeline is not None else self.connection._pipeline() # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) - job.set_status(JobStatus.QUEUED, pipeline=pipeline) + pipe.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipe) job.origin = self.name job.enqueued_at = utcnow() if job.timeout is None: job.timeout = self.DEFAULT_TIMEOUT - job.save(pipeline=pipeline) + job.save(pipeline=pipe) - pipeline.execute() + if pipeline is None: + pipe.execute() if self._async: self.push_job_id(job.id, at_front=at_front) @@ -291,8 +291,6 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry - - while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: @@ -306,6 +304,7 @@ class Queue(object): else: queue = Queue(name=dependent.origin, connection=self.connection) queue.enqueue_job(dependent, pipeline=pipeline) + pipeline.execute() def pop_job_id(self): """Pops a given job ID from this Redis queue."""