From 543bcbc60b117e424995483cb3dcfe8dbfbbd77e Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 21 Jan 2015 16:12:08 -0800 Subject: [PATCH 01/10] Explicitly cast map() result to a list, for Python 3 compat. --- rq/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 3ba297a..929f91d 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -166,11 +166,11 @@ class Worker(object): def queue_names(self): """Returns the queue names of this worker's queues.""" - return map(lambda q: q.name, self.queues) + return list(map(lambda q: q.name, self.queues)) def queue_keys(self): """Returns the Redis keys representing this worker's queues.""" - return map(lambda q: q.key, self.queues) + return list(map(lambda q: q.key, self.queues)) @property def name(self): From 01ab2f20dd90db569ec7ea954fb0c3341fee88d2 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 23 Jan 2015 13:58:25 +0700 Subject: [PATCH 02/10] Minor copywriting fixes. --- rq/worker.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 3ba297a..c0cb296 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -67,10 +67,13 @@ def signal_name(signum): return 'SIG_UNKNOWN' -WorkerStatus = enum('WorkerStatus', - STARTED='started', SUSPENDED='suspended', BUSY='busy', - IDLE='idle' - ) +WorkerStatus = enum( + 'WorkerStatus', + STARTED='started', + SUSPENDED='suspended', + BUSY='busy', + IDLE='idle' +) class Worker(object): @@ -342,7 +345,7 @@ class Worker(object): signal.signal(signal.SIGTERM, request_stop) def check_for_suspension(self, burst): - """Check to see if the workers have been suspended by something like `rq suspend`""" + """Check to see if workers have been suspended by `rq suspend`""" before_state = None notified = False @@ -350,8 +353,8 @@ class Worker(object): while not self.stopped and is_suspended(self.connection): if burst: - self.log.info('Suspended in burst mode -- exiting.') - self.log.info('Note: There could still be unperformed jobs on the queue') + self.log.info('Suspended in burst mode -- exiting.' + 'Note: There could still be unperformed jobs on the queue') raise StopRequested if not notified: From dc09676ee03b978691d5d3c0bc725ab30bc0077a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 23 Jan 2015 13:58:25 +0700 Subject: [PATCH 03/10] Minor copywriting fixes. --- rq/worker.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 929f91d..b971a24 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -67,10 +67,13 @@ def signal_name(signum): return 'SIG_UNKNOWN' -WorkerStatus = enum('WorkerStatus', - STARTED='started', SUSPENDED='suspended', BUSY='busy', - IDLE='idle' - ) +WorkerStatus = enum( + 'WorkerStatus', + STARTED='started', + SUSPENDED='suspended', + BUSY='busy', + IDLE='idle' +) class Worker(object): @@ -342,7 +345,7 @@ class Worker(object): signal.signal(signal.SIGTERM, request_stop) def check_for_suspension(self, burst): - """Check to see if the workers have been suspended by something like `rq suspend`""" + """Check to see if workers have been suspended by `rq suspend`""" before_state = None notified = False @@ -350,8 +353,8 @@ class Worker(object): while not self.stopped and is_suspended(self.connection): if burst: - self.log.info('Suspended in burst mode -- exiting.') - self.log.info('Note: There could still be unperformed jobs on the queue') + self.log.info('Suspended in burst mode -- exiting.' + 'Note: There could still be unperformed jobs on the queue') raise StopRequested if not notified: From dc5cd514ee757b401415bd1dccdf70831c4797ef Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 21 Jan 2015 16:12:08 -0800 Subject: [PATCH 04/10] Explicitly cast map() result to a list, for Python 3 compat. --- rq/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index c0cb296..b971a24 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -169,11 +169,11 @@ class Worker(object): def queue_names(self): """Returns the queue names of this worker's queues.""" - return map(lambda q: q.name, self.queues) + return list(map(lambda q: q.name, self.queues)) def queue_keys(self): """Returns the Redis keys representing this worker's queues.""" - return map(lambda q: q.key, self.queues) + return list(map(lambda q: q.key, self.queues)) @property def name(self): From 7fd2ac8ca6bc737974d11a6f2aedd20679a5fa17 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 23 Jan 2015 15:16:32 +0700 Subject: [PATCH 05/10] Added "DEFERRED" Job status for jobs that have unsatisfied dependencies. --- rq/job.py | 11 ++++++++--- rq/queue.py | 2 +- tests/test_queue.py | 6 ++++-- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/rq/job.py b/rq/job.py index 9afe578..2337ad4 100644 --- a/rq/job.py +++ b/rq/job.py @@ -25,9 +25,14 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL) loads = pickle.loads -JobStatus = enum('JobStatus', - QUEUED='queued', FINISHED='finished', FAILED='failed', - STARTED='started') +JobStatus = enum( + 'JobStatus', + QUEUED='queued', + FINISHED='finished', + FAILED='failed', + STARTED='started', + DEFERRED='deferred' +) # Sentinel value to mark that some of our lazily evaluated properties have not # yet been evaluated. diff --git a/rq/queue.py b/rq/queue.py index 7352459..9cc0fa0 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -182,7 +182,6 @@ class Queue(object): """ timeout = timeout or self._default_timeout - # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=JobStatus.QUEUED, description=description, depends_on=depends_on, timeout=timeout, @@ -200,6 +199,7 @@ class Queue(object): try: pipe.watch(depends_on.key) if depends_on.get_status() != JobStatus.FINISHED: + job.set_status(JobStatus.DEFERRED) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) pipe.execute() diff --git a/tests/test_queue.py b/tests/test_queue.py index 0a84e78..cddb3f6 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -342,8 +342,9 @@ class TestQueue(RQTestCase): # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) q = Queue() - q.enqueue_call(say_hello, depends_on=parent_job) + job = q.enqueue_call(say_hello, depends_on=parent_job) self.assertEqual(q.job_ids, []) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) # Jobs dependent on finished jobs are immediately enqueued parent_job.set_status(JobStatus.FINISHED) @@ -351,6 +352,7 @@ class TestQueue(RQTestCase): job = q.enqueue_call(say_hello, depends_on=parent_job) self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) + self.assertEqual(job.get_status(), JobStatus.QUEUED) def test_enqueue_job_with_dependency_by_id(self): """Enqueueing jobs should work as expected by id as well as job-objects.""" @@ -368,7 +370,7 @@ class TestQueue(RQTestCase): self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) def test_enqueue_job_with_dependency_and_timeout(self): - """Jobs still know their specified timeout after being scheduled as a dependency.""" + """Jobs remember their timeout when enqueued as a dependency.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) q = Queue() From 9320496402c1a92d946eaac28bfb03aae194c0b0 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 23 Jan 2015 16:05:29 +0700 Subject: [PATCH 06/10] Simplify FailedQueue.quarantine and ensure that a deferred job's status is set to Queued when enqueued. --- rq/queue.py | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 9cc0fa0..eb50314 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -248,24 +248,25 @@ class Queue(object): description=description, depends_on=depends_on, job_id=job_id, at_front=at_front) - def enqueue_job(self, job, set_meta_data=True, at_front=False): + def enqueue_job(self, job, at_front=False): """Enqueues a job for delayed execution. - If the `set_meta_data` argument is `True` (default), it will update - the properties `origin` and `enqueued_at`. - If Queue is instantiated with async=False, job is executed immediately. """ - # Add Queue key set - self.connection.sadd(self.redis_queues_keys, self.key) + + with self.connection._pipeline() as pipeline: + # Add Queue key set + self.connection.sadd(self.redis_queues_keys, self.key) + job.set_status(JobStatus.QUEUED, pipeline=pipeline) - if set_meta_data: job.origin = self.name job.enqueued_at = utcnow() - if job.timeout is None: - job.timeout = self.DEFAULT_TIMEOUT - job.save() + if job.timeout is None: + job.timeout = self.DEFAULT_TIMEOUT + job.save(pipeline=pipeline) + + pipeline.execute() if self._async: self.push_job_id(job.id, at_front=at_front) @@ -401,14 +402,20 @@ class FailedQueue(Queue): def quarantine(self, job, exc_info): """Puts the given Job in quarantine (i.e. put it on the failed queue). - - This is different from normal job enqueueing, since certain meta data - must not be overridden (e.g. `origin` or `enqueued_at`) and other meta - data must be inserted (`ended_at` and `exc_info`). """ - job.ended_at = utcnow() - job.exc_info = exc_info - return self.enqueue_job(job, set_meta_data=False) + + with self.connection._pipeline() as pipeline: + # Add Queue key set + self.connection.sadd(self.redis_queues_keys, self.key) + + job.ended_at = utcnow() + job.exc_info = exc_info + job.save(pipeline=pipeline) + + self.push_job_id(job.id, pipeline=pipeline) + pipeline.execute() + + return job def requeue(self, job_id): """Requeues the job with the given job ID.""" From 3e674fbe6aa20162c094e93fb9a01ead2dabc43d Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 23 Jan 2015 16:33:48 +0700 Subject: [PATCH 07/10] queue.enqueue() should set job.origin. --- rq/job.py | 7 +++++-- rq/queue.py | 9 +++++---- tests/test_queue.py | 3 +-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/rq/job.py b/rq/job.py index 2337ad4..fbf7d5a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -88,8 +88,8 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, - id=None): + result_ttl=None, ttl=None, status=None, description=None, + depends_on=None, timeout=None, id=None, origin=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -107,6 +107,9 @@ class Job(object): if id is not None: job.set_id(id) + if origin is not None: + job.origin = origin + # Set the core job tuple properties job._instance = None if inspect.ismethod(func): diff --git a/rq/queue.py b/rq/queue.py index eb50314..d596106 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -182,10 +182,11 @@ class Queue(object): """ timeout = timeout or self._default_timeout - job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=JobStatus.QUEUED, - description=description, depends_on=depends_on, timeout=timeout, - id=job_id) + job = self.job_class.create( + func, args, kwargs, connection=self.connection, + result_ttl=result_ttl, status=JobStatus.QUEUED, + description=description, depends_on=depends_on, + timeout=timeout, id=job_id, origin=self.name) # If job depends on an unfinished job, register itself on it's # parent's dependents instead of enqueueing it. diff --git a/tests/test_queue.py b/tests/test_queue.py index cddb3f6..f5edcbf 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -117,6 +117,7 @@ class TestQueue(RQTestCase): # say_hello spec holds which queue this is sent to job = q.enqueue(say_hello, 'Nick', foo='bar') job_id = job.id + self.assertEqual(job.origin, q.name) # Inspect data inside Redis q_key = 'rq:queue:default' @@ -131,14 +132,12 @@ class TestQueue(RQTestCase): job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar')) # Preconditions - self.assertIsNone(job.origin) self.assertIsNone(job.enqueued_at) # Action q.enqueue_job(job) # Postconditions - self.assertEquals(job.origin, q.name) self.assertIsNotNone(job.enqueued_at) def test_pop_job_id(self): From dac0be6cc79dd78f8039e4b5b989db425ed380b3 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 23 Jan 2015 17:53:27 +0700 Subject: [PATCH 08/10] Added DeferredJobsRegistry to keep track of deferred jobs. --- rq/job.py | 8 +++++++- rq/queue.py | 5 +++++ rq/registry.py | 18 +++++++++++++++++- tests/test_job.py | 11 +++++++++-- tests/test_queue.py | 22 ++++++++++++++-------- tests/test_registry.py | 19 ++++++++++++++++++- 6 files changed, 70 insertions(+), 13 deletions(-) diff --git a/rq/job.py b/rq/job.py index fbf7d5a..d815c5b 100644 --- a/rq/job.py +++ b/rq/job.py @@ -544,8 +544,14 @@ class Job(object): rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} - This method adds the current job in its dependency's dependents set. + This method adds the job in its dependency's dependents set + and adds the job to DeferredJobRegistry. """ + from .registry import DeferredJobRegistry + + registry = DeferredJobRegistry(self.origin, connection=self.connection) + registry.add(self, pipeline=pipeline) + connection = pipeline if pipeline is not None else self.connection connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) diff --git a/rq/queue.py b/rq/queue.py index d596106..e2c358c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -279,11 +279,16 @@ class Queue(object): def enqueue_dependents(self, job): """Enqueues all jobs in the given job's dependents set and clears it.""" # TODO: can probably be pipelined + from .registry import DeferredJobRegistry + + registry = DeferredJobRegistry(self.name, self.connection) + while True: job_id = as_text(self.connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) + registry.remove(dependent) self.enqueue_job(dependent) def pop_job_id(self): diff --git a/rq/registry.py b/rq/registry.py index b4cf43f..08798eb 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -24,7 +24,7 @@ class BaseRegistry(object): self.cleanup() return self.connection.zcard(self.key) - def add(self, job, ttl, pipeline=None): + def add(self, job, ttl=0, pipeline=None): """Adds a job to a registry with expiry time of now + ttl.""" score = ttl if ttl < 0 else current_timestamp() + ttl if pipeline is not None: @@ -108,3 +108,19 @@ class FinishedJobRegistry(BaseRegistry): """ score = timestamp if timestamp is not None else current_timestamp() self.connection.zremrangebyscore(self.key, 0, score) + + +class DeferredJobRegistry(BaseRegistry): + """ + Registry of deferred jobs (waiting for another job to finish). + """ + + def __init__(self, name='default', connection=None): + super(DeferredJobRegistry, self).__init__(name, connection) + self.key = 'rq:deferred:%s' % name + + def cleanup(self): + """This method is only here to prevent errors because this method is + automatically called by `count()` and `get_job_ids()` methods + implemented in BaseRegistry.""" + pass diff --git a/tests/test_job.py b/tests/test_job.py index 34859a7..b7854b3 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -7,6 +7,7 @@ from datetime import datetime from rq.compat import as_text, PY2 from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import get_current_job, Job +from rq.registry import DeferredJobRegistry from rq.queue import Queue from rq.utils import utcformat @@ -331,12 +332,18 @@ class TestJob(RQTestCase): self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) def test_register_dependency(self): - """Test that jobs updates the correct job dependents.""" - job = Job.create(func=say_hello) + """Ensure dependency registration works properly.""" + origin = 'some_queue' + registry = DeferredJobRegistry(origin, self.testconn) + + job = Job.create(func=say_hello, origin=origin) job._dependency_id = 'id' job.save() + + self.assertEqual(registry.get_job_ids(), []) job.register_dependency() self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id) + self.assertEqual(registry.get_job_ids(), [job.id]) def test_cancel(self): """job.cancel() deletes itself & dependents mapping from Redis.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index f5edcbf..369b3f1 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -5,6 +5,7 @@ from __future__ import (absolute_import, division, print_function, from rq import get_failed_queue, Queue from rq.exceptions import InvalidJobOperationError from rq.job import Job, JobStatus +from rq.registry import DeferredJobRegistry from rq.worker import Worker from tests import RQTestCase @@ -319,23 +320,28 @@ class TestQueue(RQTestCase): self.assertEquals(len(Queue.all()), 3) def test_enqueue_dependents(self): - """Enqueueing the dependent jobs pushes all jobs in the depends set to the queue.""" + """Enqueueing dependent jobs pushes all jobs in the depends set to the queue + and removes them from DeferredJobQueue.""" q = Queue() parent_job = Job.create(func=say_hello) parent_job.save() - job_1 = Job.create(func=say_hello, depends_on=parent_job) - job_1.save() - job_1.register_dependency() - job_2 = Job.create(func=say_hello, depends_on=parent_job) - job_2.save() - job_2.register_dependency() + job_1 = q.enqueue(say_hello, depends_on=parent_job) + job_2 = q.enqueue(say_hello, depends_on=parent_job) + registry = DeferredJobRegistry(q.name, connection=self.testconn) + self.assertEqual( + set(registry.get_job_ids()), + set([job_1.id, job_2.id]) + ) # After dependents is enqueued, job_1 and job_2 should be in queue self.assertEqual(q.job_ids, []) q.enqueue_dependents(parent_job) - self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id])) + self.assertEqual(set(q.job_ids), set([job_2.id, job_1.id])) self.assertFalse(self.testconn.exists(parent_job.dependents_key)) + # DeferredJobRegistry should also be empty + self.assertEqual(registry.get_job_ids(), []) + def test_enqueue_job_with_dependency(self): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued diff --git a/tests/test_registry.py b/tests/test_registry.py index 26470e3..3798a8f 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -5,7 +5,8 @@ from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import FinishedJobRegistry, StartedJobRegistry +from rq.registry import (DeferredJobRegistry, FinishedJobRegistry, + StartedJobRegistry) from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -135,3 +136,19 @@ class TestFinishedJobRegistry(RQTestCase): failed_job = queue.enqueue(div_by_zero) worker.perform_job(failed_job) self.assertEqual(self.registry.get_job_ids(), [job.id]) + + +class TestRegistry(RQTestCase): + + def setUp(self): + super(TestRegistry, self).setUp() + self.registry = DeferredJobRegistry(connection=self.testconn) + + def test_add(self): + """Adding a job to DeferredJobsRegistry.""" + job = Job() + self.registry.add(job) + self.assertEqual( + self.testconn.zrange(self.registry.key, 0, -1), + [job.id] + ) \ No newline at end of file From de1cd8a83c8f1da732921d7b4536371f8b6e06b9 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 26 Jan 2015 08:31:58 +0700 Subject: [PATCH 09/10] Fixed test error in Python 3. --- tests/test_registry.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/test_registry.py b/tests/test_registry.py index 3798a8f..5f5cf75 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import +from rq.compat import as_text from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp @@ -120,7 +121,6 @@ class TestFinishedJobRegistry(RQTestCase): self.registry.cleanup(timestamp + 20) self.assertEqual(self.registry.get_job_ids(), ['baz']) - def test_jobs_are_put_in_registry(self): """Completed jobs are added to FinishedJobRegistry.""" self.assertEqual(self.registry.get_job_ids(), []) @@ -138,7 +138,7 @@ class TestFinishedJobRegistry(RQTestCase): self.assertEqual(self.registry.get_job_ids(), [job.id]) -class TestRegistry(RQTestCase): +class TestDeferredRegistry(RQTestCase): def setUp(self): super(TestRegistry, self).setUp() @@ -148,7 +148,6 @@ class TestRegistry(RQTestCase): """Adding a job to DeferredJobsRegistry.""" job = Job() self.registry.add(job) - self.assertEqual( - self.testconn.zrange(self.registry.key, 0, -1), - [job.id] - ) \ No newline at end of file + job_ids = [as_text(job_id) for job_id in + self.testconn.zrange(self.registry.key, 0, -1)] + self.assertEqual(job_ids, [job.id]) From 105b95e9b8eaa20bbd9b1a47742ffef71d2bd694 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 26 Jan 2015 08:37:03 +0700 Subject: [PATCH 10/10] Fixed an error in super call. --- tests/test_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_registry.py b/tests/test_registry.py index 5f5cf75..f54b315 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -141,7 +141,7 @@ class TestFinishedJobRegistry(RQTestCase): class TestDeferredRegistry(RQTestCase): def setUp(self): - super(TestRegistry, self).setUp() + super(TestDeferredRegistry, self).setUp() self.registry = DeferredJobRegistry(connection=self.testconn) def test_add(self):