From 41ae1ce8a704c93bca0dd7acde7c045b133d19c6 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 9 Sep 2014 23:54:33 +0700 Subject: [PATCH 1/5] Added a registry.get_job_count(). --- rq/registry.py | 9 +++++++++ tests/test_job_started_registry.py | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/rq/registry.py b/rq/registry.py index afa7b5b..2e5fd22 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -21,6 +21,15 @@ class StartedJobRegistry: self.key = 'rq:wip:%s' % name self.connection = resolve_connection(connection) + def __len__(self): + """Returns the number of jobs in this registry""" + return self.get_job_count() + + def get_job_count(self): + """Returns the number of jobs in this registry""" + self.move_expired_jobs_to_failed_queue() + return self.connection.zcard(self.key) + def add(self, job, timeout, pipeline=None): """Adds a job to StartedJobRegistry with expiry time of now + timeout.""" score = current_timestamp() + timeout diff --git a/tests/test_job_started_registry.py b/tests/test_job_started_registry.py index addb1db..312f762 100644 --- a/tests/test_job_started_registry.py +++ b/tests/test_job_started_registry.py @@ -76,3 +76,10 @@ class TestQueue(RQTestCase): worker.perform_job(job) self.assertNotIn(job.id, registry.get_job_ids()) + + def test_get_job_count(self): + """StartedJobRegistry returns the right number of job count.""" + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, 10, 'bar') + self.assertEqual(self.registry.get_job_count(), 2) + self.assertEqual(len(self.registry), 2) From fae7df5aa7a5db820f4e60533e0151f8144e8af7 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 9 Sep 2014 23:58:35 +0700 Subject: [PATCH 2/5] Renamed StartedJobRegistry.get_job_count to StartedJobRegistry.count for consistency. --- rq/registry.py | 5 +++-- tests/test_job_started_registry.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index 2e5fd22..0242280 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -23,9 +23,10 @@ class StartedJobRegistry: def __len__(self): """Returns the number of jobs in this registry""" - return self.get_job_count() + return self.count - def get_job_count(self): + @property + def count(self): """Returns the number of jobs in this registry""" self.move_expired_jobs_to_failed_queue() return self.connection.zcard(self.key) diff --git a/tests/test_job_started_registry.py b/tests/test_job_started_registry.py index 312f762..f99aafd 100644 --- a/tests/test_job_started_registry.py +++ b/tests/test_job_started_registry.py @@ -81,5 +81,5 @@ class TestQueue(RQTestCase): """StartedJobRegistry returns the right number of job count.""" self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, 10, 'bar') - self.assertEqual(self.registry.get_job_count(), 2) + self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) From c1dc30eae30aefe9ab501012a3cd9c5e2aac0c58 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 10 Sep 2014 00:00:16 +0700 Subject: [PATCH 3/5] Added __len__ method to Queue. --- rq/queue.py | 3 +++ tests/test_queue.py | 6 ++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 621942a..7569104 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -68,6 +68,9 @@ class Queue(object): job_class = import_attribute(job_class) self.job_class = job_class + def __len__(self): + return self.count + @property def key(self): """Returns the Redis key for this Queue.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index c60b510..e61568e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -101,11 +101,13 @@ class TestQueue(RQTestCase): q.enqueue(say_hello, 'Charlie') self.testconn.lpush(q.key, '1', '2') - self.assertEquals(q.count, 4) + self.assertEqual(q.count, 4) + self.assertEqual(len(q), 4) q.compact() - self.assertEquals(q.count, 2) + self.assertEqual(q.count, 2) + self.assertEqual(len(q), 2) def test_enqueue(self): """Enqueueing job onto queues.""" From 74a9982ecb4a01d8f60911b071c90cded806ce39 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 13 Sep 2014 17:13:18 +0700 Subject: [PATCH 4/5] Fixed a bug where expired jobs aren't deleted from JobStartedRegistry. --- rq/registry.py | 38 ++++++++++++++++++++---------- tests/test_job_started_registry.py | 13 ++++++---- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index 0242280..59becfe 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -4,21 +4,17 @@ from .queue import FailedQueue from .utils import current_timestamp -class StartedJobRegistry: +class BaseRegistry(object): """ - Registry of currently executing jobs. Each queue maintains a StartedJobRegistry. - StartedJobRegistry contains job keys that are currently being executed. - Each key is scored by job's expiration time (datetime started + timeout). + Base implementation of job registry, implemented in Redis sorted set. Each job + is stored as a key in the registry, scored by expiration time (unix timestamp). - Jobs are added to registry right before they are executed and removed - right after completion (success or failure). - - Jobs whose score are lower than current time is considered "expired". + Jobs with scores are lower than current time is considered "expired" and + should be cleaned up. """ def __init__(self, name='default', connection=None): self.name = name - self.key = 'rq:wip:%s' % name self.connection = resolve_connection(connection) def __len__(self): @@ -28,7 +24,7 @@ class StartedJobRegistry: @property def count(self): """Returns the number of jobs in this registry""" - self.move_expired_jobs_to_failed_queue() + self.cleanup() return self.connection.zcard(self.key) def add(self, job, timeout, pipeline=None): @@ -50,11 +46,28 @@ class StartedJobRegistry: def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" - self.move_expired_jobs_to_failed_queue() + self.cleanup() return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)] - def move_expired_jobs_to_failed_queue(self): + +class StartedJobRegistry(BaseRegistry): + """ + Registry of currently executing jobs. Each queue maintains a + StartedJobRegistry. Jobs in this registry are ones that are currently + being executed. + + Jobs are added to registry right before they are executed and removed + right after completion (success or failure). + + Jobs whose score are lower than current time is considered "expired". + """ + + def __init__(self, name='default', connection=None): + super(StartedJobRegistry, self).__init__(name, connection) + self.key = 'rq:wip:%s' % name + + def cleanup(self): """Remove expired jobs from registry and add them to FailedQueue.""" job_ids = self.get_expired_job_ids() @@ -63,6 +76,7 @@ class StartedJobRegistry: with self.connection.pipeline() as pipeline: for job_id in job_ids: failed_queue.push_job_id(job_id, pipeline=pipeline) + pipeline.zremrangebyscore(self.key, 0, current_timestamp()) pipeline.execute() return job_ids diff --git a/tests/test_job_started_registry.py b/tests/test_job_started_registry.py index f99aafd..eba1c36 100644 --- a/tests/test_job_started_registry.py +++ b/tests/test_job_started_registry.py @@ -33,8 +33,9 @@ class TestQueue(RQTestCase): def test_get_job_ids(self): """Getting job ids from StartedJobRegistry.""" - self.testconn.zadd(self.registry.key, 1, 'foo') - self.testconn.zadd(self.registry.key, 10, 'bar') + timestamp = current_timestamp() + self.testconn.zadd(self.registry.key, timestamp + 10, 'foo') + self.testconn.zadd(self.registry.key, timestamp + 20, 'bar') self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) def test_get_expired_job_ids(self): @@ -51,8 +52,9 @@ class TestQueue(RQTestCase): failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) self.testconn.zadd(self.registry.key, 1, 'foo') - self.registry.move_expired_jobs_to_failed_queue() + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) def test_job_execution(self): """Job is removed from StartedJobRegistry after execution.""" @@ -79,7 +81,8 @@ class TestQueue(RQTestCase): def test_get_job_count(self): """StartedJobRegistry returns the right number of job count.""" - self.testconn.zadd(self.registry.key, 1, 'foo') - self.testconn.zadd(self.registry.key, 10, 'bar') + timestamp = current_timestamp() + 10 + self.testconn.zadd(self.registry.key, timestamp, 'foo') + self.testconn.zadd(self.registry.key, timestamp, 'bar') self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) From 2307bc8253ef44e965bb694a47974244acdd2ead Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 14 Sep 2014 10:29:17 +0700 Subject: [PATCH 5/5] Added FinishedJobRegistry to monitor finished jobs. --- rq/registry.py | 15 ++++++++ rq/worker.py | 12 ++++-- ...b_started_registry.py => test_registry.py} | 38 +++++++++++++++++-- 3 files changed, 58 insertions(+), 7 deletions(-) rename tests/{test_job_started_registry.py => test_registry.py} (70%) diff --git a/rq/registry.py b/rq/registry.py index 59becfe..c59bf88 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -80,3 +80,18 @@ class StartedJobRegistry(BaseRegistry): pipeline.execute() return job_ids + + +class FinishedJobRegistry(BaseRegistry): + """ + Registry of jobs that have been completed. Jobs are added to this + registry after they have successfully completed for monitoring purposes. + """ + + def __init__(self, name='default', connection=None): + super(FinishedJobRegistry, self).__init__(name, connection) + self.key = 'rq:finished:%s' % name + + def cleanup(self): + """Remove expired jobs from registry.""" + self.connection.zremrangebyscore(self.key, 0, current_timestamp()) diff --git a/rq/worker.py b/rq/worker.py index fbf981d..48e22a9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION -from .registry import StartedJobRegistry +from .registry import FinishedJobRegistry, StartedJobRegistry try: from procname import setprocname @@ -496,7 +496,7 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: - registry = StartedJobRegistry(job.origin, self.connection) + started_job_registry = StartedJobRegistry(job.origin) try: with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): @@ -513,14 +513,18 @@ class Worker(object): job.ended_at = utcnow() job._status = Status.FINISHED job.save(pipeline=pipeline) + + finished_job_registry = FinishedJobRegistry(job.origin) + finished_job_registry.add(job, result_ttl, pipeline) + job.cleanup(result_ttl, pipeline=pipeline) - registry.remove(job, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() except Exception: job.set_status(Status.FAILED, pipeline=pipeline) - registry.remove(job, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() self.handle_exception(job, *sys.exc_info()) return False diff --git a/tests/test_job_started_registry.py b/tests/test_registry.py similarity index 70% rename from tests/test_job_started_registry.py rename to tests/test_registry.py index eba1c36..ce4a345 100644 --- a/tests/test_job_started_registry.py +++ b/tests/test_registry.py @@ -5,16 +5,16 @@ from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import StartedJobRegistry +from rq.registry import FinishedJobRegistry, StartedJobRegistry from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello -class TestQueue(RQTestCase): +class TestRegistry(RQTestCase): def setUp(self): - super(TestQueue, self).setUp() + super(TestRegistry, self).setUp() self.registry = StartedJobRegistry(connection=self.testconn) def test_add_and_remove(self): @@ -86,3 +86,35 @@ class TestQueue(RQTestCase): self.testconn.zadd(self.registry.key, timestamp, 'bar') self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) + + +class TestFinishedJobRegistry(RQTestCase): + + def setUp(self): + super(TestFinishedJobRegistry, self).setUp() + self.registry = FinishedJobRegistry(connection=self.testconn) + + def test_cleanup(self): + """Finished job registry removes expired jobs.""" + timestamp = current_timestamp() + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + + self.registry.cleanup() + self.assertEqual(self.registry.get_job_ids(), ['bar']) + + def test_jobs_are_put_in_registry(self): + """Completed jobs are added to FinishedJobRegistry.""" + self.assertEqual(self.registry.get_job_ids(), []) + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + + # Completed jobs are put in FinishedJobRegistry + job = queue.enqueue(say_hello) + worker.perform_job(job) + self.assertEqual(self.registry.get_job_ids(), [job.id]) + + # Failed jobs are not put in FinishedJobRegistry + failed_job = queue.enqueue(div_by_zero) + worker.perform_job(failed_job) + self.assertEqual(self.registry.get_job_ids(), [job.id])