From e61d1505bcb8765ab1cbecc17ea243d44cb587db Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 29 Jul 2014 18:26:24 +0700 Subject: [PATCH 01/16] Added WorkingQueue class. --- rq/utils.py | 6 +++++ rq/working_queue.py | 38 ++++++++++++++++++++++++++++++++ tests/test_working_queue.py | 44 +++++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+) create mode 100644 rq/working_queue.py create mode 100644 tests/test_working_queue.py diff --git a/rq/utils.py b/rq/utils.py index d85235e..885e5b0 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -8,6 +8,7 @@ terminal colorizing code, originally by Georg Brandl. from __future__ import (absolute_import, division, print_function, unicode_literals) +import calendar import importlib import datetime import logging @@ -229,3 +230,8 @@ def first(iterable, default=None, key=None): return el return default + + +def current_timestamp(): + """Returns current UTC timestamp""" + return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) diff --git a/rq/working_queue.py b/rq/working_queue.py new file mode 100644 index 0000000..3e5da45 --- /dev/null +++ b/rq/working_queue.py @@ -0,0 +1,38 @@ +from .connections import resolve_connection +from .queue import FailedQueue +from .utils import current_timestamp + + +class WorkingQueue: + """ + Registry of currently executing jobs. Each queue maintains a WorkingQueue. + WorkingQueue contains job keys that are currently being executed. + Each key is scored by job's expiration time (datetime started + timeout). + + Jobs are added to registry right before they are executed and removed + right after completion (success or failure). + + Jobs whose score are lower than current time is considered "expired". + """ + + def __init__(self, name, connection=None): + self.name = name + self.key = 'rq:wip:%s' % name + self.connection = resolve_connection(connection) + + def add(self, job, timeout): + """Adds a job to WorkingQueue with expiry time of now + timeout.""" + return self.connection._zadd(self.key, current_timestamp() + timeout, + job.id) + + def remove(self, job): + return self.connection.zrem(self.key, job.id) + + def get_expired_job_ids(self): + """Returns job ids whose score are less than current timestamp.""" + return self.connection.zrangebyscore(self.key, 0, current_timestamp()) + + def get_job_ids(self, start=0, end=-1): + """Returns list of all job ids.""" + return self.connection.zrange(self.key, start, end) + diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py new file mode 100644 index 0000000..9c1a1e5 --- /dev/null +++ b/tests/test_working_queue.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from rq.job import Job +from rq.utils import current_timestamp +from rq.working_queue import WorkingQueue + +from tests import RQTestCase + + +class TestQueue(RQTestCase): + + def setUp(self): + super(TestQueue, self).setUp() + self.working_queue = WorkingQueue('default', connection=self.testconn) + + def test_add_and_remove(self): + """Adding and removing job to WorkingQueue.""" + timestamp = current_timestamp() + job = Job() + + # Test that job is added with the right score + self.working_queue.add(job, 1000) + self.assertLess(self.testconn.zscore(self.working_queue.key, job.id), + timestamp + 1001) + + # Ensure that job is properly removed from sorted set + self.working_queue.remove(job) + self.assertIsNone(self.testconn.zscore(self.working_queue.key, job.id)) + + def test_get_job_ids(self): + """Getting job ids from WorkingQueue.""" + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.testconn.zadd(self.working_queue.key, 10, 'bar') + self.assertEqual(self.working_queue.get_job_ids(), ['foo', 'bar']) + + def test_get_expired_job_ids(self): + """Getting expired job ids form WorkingQueue.""" + timestamp = current_timestamp() + + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') + + self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) \ No newline at end of file From 90c7eeb1115b10d5620a7fa3fcd0d7e0332a471d Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 29 Jul 2014 18:37:22 +0700 Subject: [PATCH 02/16] Implemented WorkingQueue.cleanup(). --- rq/queue.py | 5 +++-- rq/working_queue.py | 12 ++++++++++++ tests/test_working_queue.py | 13 ++++++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 22aa160..a74e686 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -158,9 +158,10 @@ class Queue(object): if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id): + def push_job_id(self, job_id, pipeline=None): """Pushes a job ID on the corresponding Redis queue.""" - self.connection.rpush(self.key, job_id) + connection = pipeline if pipeline is not None else self.connection + connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None): diff --git a/rq/working_queue.py b/rq/working_queue.py index 3e5da45..64cf1e9 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -36,3 +36,15 @@ class WorkingQueue: """Returns list of all job ids.""" return self.connection.zrange(self.key, start, end) + def cleanup(self): + """Removes expired job ids to FailedQueue.""" + job_ids = self.get_expired_job_ids() + + if job_ids: + failed_queue = FailedQueue(connection=self.connection) + with self.connection.pipeline() as pipeline: + for job_id in job_ids: + failed_queue.push_job_id(job_id, pipeline=pipeline) + pipeline.execute() + + return job_ids diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index 9c1a1e5..af6eeb4 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from rq.job import Job +from rq.queue import FailedQueue from rq.utils import current_timestamp from rq.working_queue import WorkingQueue @@ -41,4 +42,14 @@ class TestQueue(RQTestCase): self.testconn.zadd(self.working_queue.key, 1, 'foo') self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') - self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) \ No newline at end of file + self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) + + def test_cleanup(self): + """Moving expired jobs to FailedQueue.""" + failed_queue = FailedQueue(connection=self.testconn) + self.assertTrue(failed_queue.is_empty()) + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.working_queue.cleanup() + self.assertIn('foo', failed_queue.job_ids) + + \ No newline at end of file From f38d0dc79141432bf417a07d8de23f5092d3cacc Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 30 Jul 2014 13:36:35 +0800 Subject: [PATCH 03/16] Moved some logic into worker.prepare_job_execution to make things testable. --- rq/worker.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 59c1624..2731c98 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,6 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION +from .working_queue import WorkingQueue try: from procname import setprocname @@ -403,7 +404,7 @@ class Worker(object): self.heartbeat() return result - def heartbeat(self, timeout=0): + def heartbeat(self, timeout=0, pipeline=None): """Specifies a new worker timeout, typically by extending the expiration time of the worker, effectively making this a "heartbeat" to not expire the worker until the timeout passes. @@ -415,7 +416,8 @@ class Worker(object): only larger. """ timeout = max(timeout, self.default_worker_ttl) - self.connection.expire(self.key, timeout) + connection = pipeline if pipeline is not None else self.connection + connection.expire(self.key, timeout) self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within {0} seconds.'.format(timeout)) @@ -468,19 +470,26 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) - def perform_job(self, job): - """Performs the actual work of a job. Will/should only be called - inside the work horse's process. + def prepare_job_execution(self, job): + """Performs misc bookkeeping like updating states prior to + job execution. """ - - self.set_state('busy') - self.set_current_job_id(job.id) - self.heartbeat((job.timeout or 180) + 60) + with self.connection._pipeline() as pipeline: + self.set_state('busy', pipeline=pipeline) + self.set_current_job_id(job.id, pipeline=pipeline) + self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) + pipeline.execute() self.procline('Processing %s from %s since %s' % ( job.func_name, job.origin, time.time())) + def perform_job(self, job): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ + self.prepare_job_execution(job) + with self.connection._pipeline() as pipeline: try: job.set_status(Status.STARTED) From 893fc5a6ae2171f4cd574c0e6ed4483c151ed61a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 30 Jul 2014 16:06:39 +0800 Subject: [PATCH 04/16] Add job to WorkingQueue before execution and remove from WorkingQueue after. --- rq/worker.py | 13 ++++++++++--- rq/working_queue.py | 16 ++++++++++------ tests/test_worker.py | 16 ++++++++++++++++ tests/test_working_queue.py | 29 ++++++++++++++++++++++++++--- 4 files changed, 62 insertions(+), 12 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 2731c98..41f17d1 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -475,9 +475,12 @@ class Worker(object): job execution. """ with self.connection._pipeline() as pipeline: + timeout = (job.timeout or 180) + 60 self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) - self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) + self.heartbeat(timeout, pipeline=pipeline) + working_queue = WorkingQueue(job.origin, self.connection) + working_queue.add(job, timeout, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -491,13 +494,15 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: + working_queue = WorkingQueue(job.origin, self.connection) + try: job.set_status(Status.STARTED) with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() - # Pickle the result in the same try-except block since we need to - # use the same exc handling when pickling fails + # Pickle the result in the same try-except block since we need + # to use the same exc handling when pickling fails job._result = rv self.set_current_job_id(None, pipeline=pipeline) @@ -508,12 +513,14 @@ class Worker(object): job._status = Status.FINISHED job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) + working_queue.remove(job, pipeline=pipeline) pipeline.execute() except Exception: # Use the public setter here, to immediately update Redis job.set_status(Status.FAILED) + working_queue.remove(job) self.handle_exception(job, *sys.exc_info()) return False diff --git a/rq/working_queue.py b/rq/working_queue.py index 64cf1e9..8959384 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -15,18 +15,22 @@ class WorkingQueue: Jobs whose score are lower than current time is considered "expired". """ - def __init__(self, name, connection=None): + def __init__(self, name='default', connection=None): self.name = name self.key = 'rq:wip:%s' % name self.connection = resolve_connection(connection) - def add(self, job, timeout): + def add(self, job, timeout, pipeline=None): """Adds a job to WorkingQueue with expiry time of now + timeout.""" - return self.connection._zadd(self.key, current_timestamp() + timeout, - job.id) + score = current_timestamp() + timeout + if pipeline is not None: + return pipeline.zadd(self.key, score, job.id) - def remove(self, job): - return self.connection.zrem(self.key, job.id) + return self.connection._zadd(self.key, score, job.id) + + def remove(self, job, pipeline=None): + connection = pipeline if pipeline is not None else self.connection + return connection.zrem(self.key, job.id) def get_expired_job_ids(self): """Returns job ids whose score are less than current timestamp.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index e02ee2a..91f95ea 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -7,6 +7,7 @@ import os from rq import get_failed_queue, Queue, Worker from rq.compat import as_text from rq.job import Job, Status +from rq.working_queue import WorkingQueue from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, @@ -277,3 +278,18 @@ class TestWorker(RQTestCase): q = Queue() worker = Worker([q], job_class=CustomJob) self.assertEqual(worker.job_class, CustomJob) + + def test_prepare_job_execution(self): + """Prepare job execution does the necessary bookkeeping.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + worker = Worker([queue]) + worker.prepare_job_execution(job) + + # Updates working queue + working_queue = WorkingQueue(connection=self.testconn) + self.assertEqual(working_queue.get_job_ids(), [job.id]) + + # Updates worker statuses + self.assertEqual(worker.state, 'busy') + self.assertEqual(worker.get_current_job_id(), job.id) diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index af6eeb4..249c035 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -2,18 +2,20 @@ from __future__ import absolute_import from rq.job import Job -from rq.queue import FailedQueue +from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp +from rq.worker import Worker from rq.working_queue import WorkingQueue from tests import RQTestCase +from tests.fixtures import div_by_zero, say_hello class TestQueue(RQTestCase): def setUp(self): super(TestQueue, self).setUp() - self.working_queue = WorkingQueue('default', connection=self.testconn) + self.working_queue = WorkingQueue(connection=self.testconn) def test_add_and_remove(self): """Adding and removing job to WorkingQueue.""" @@ -52,4 +54,25 @@ class TestQueue(RQTestCase): self.working_queue.cleanup() self.assertIn('foo', failed_queue.job_ids) - \ No newline at end of file + def test_job_execution(self): + """Job is removed from WorkingQueue after execution.""" + working_queue = WorkingQueue(connection=self.testconn) + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + + job = queue.enqueue(say_hello) + + worker.prepare_job_execution(job) + self.assertIn(job.id, working_queue.get_job_ids()) + + worker.perform_job(job) + self.assertNotIn(job.id, working_queue.get_job_ids()) + + # Job that fails + job = queue.enqueue(div_by_zero) + + worker.prepare_job_execution(job) + self.assertIn(job.id, working_queue.get_job_ids()) + + worker.perform_job(job) + self.assertNotIn(job.id, working_queue.get_job_ids()) From d667fb0713c9e20256c565f2d601967b9600aedb Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 7 Sep 2014 17:03:17 +0700 Subject: [PATCH 05/16] working_queue.remove call should be pipelined. --- rq/job.py | 5 +++-- rq/worker.py | 12 +++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/rq/job.py b/rq/job.py index ef7a266..bd91f13 100644 --- a/rq/job.py +++ b/rq/job.py @@ -147,9 +147,10 @@ class Job(object): ) return self.get_status() - def set_status(self, status): + def set_status(self, status, pipeline=None): self._status = status - self.connection.hset(self.key, 'status', self._status) + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'status', self._status) def _set_status(self, status): warnings.warn( diff --git a/rq/worker.py b/rq/worker.py index 41f17d1..21ddac6 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -474,13 +474,15 @@ class Worker(object): """Performs misc bookkeeping like updating states prior to job execution. """ + timeout = (job.timeout or 180) + 60 + with self.connection._pipeline() as pipeline: - timeout = (job.timeout or 180) + 60 self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) working_queue = WorkingQueue(job.origin, self.connection) working_queue.add(job, timeout, pipeline=pipeline) + job.set_status(Status.STARTED, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -497,7 +499,6 @@ class Worker(object): working_queue = WorkingQueue(job.origin, self.connection) try: - job.set_status(Status.STARTED) with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() @@ -518,9 +519,10 @@ class Worker(object): pipeline.execute() except Exception: - # Use the public setter here, to immediately update Redis - job.set_status(Status.FAILED) - working_queue.remove(job) + job.set_status(Status.FAILED, pipeline=pipeline) + working_queue.remove(job, pipeline=pipeline) + pipeline.execute() + self.handle_exception(job, *sys.exc_info()) return False From 1158a0606cdfaa4a3e9829decb8f3d1d94b17ea8 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 8 Sep 2014 22:35:18 +0700 Subject: [PATCH 06/16] Fixed Python 3 tests for "WorkingQueue". --- rq/working_queue.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rq/working_queue.py b/rq/working_queue.py index 8959384..6dc6826 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -1,3 +1,4 @@ +from .compat import as_text from .connections import resolve_connection from .queue import FailedQueue from .utils import current_timestamp @@ -34,11 +35,13 @@ class WorkingQueue: def get_expired_job_ids(self): """Returns job ids whose score are less than current timestamp.""" - return self.connection.zrangebyscore(self.key, 0, current_timestamp()) + return [as_text(job_id) for job_id in + self.connection.zrangebyscore(self.key, 0, current_timestamp())] def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" - return self.connection.zrange(self.key, start, end) + return [as_text(job_id) for job_id in + self.connection.zrange(self.key, start, end)] def cleanup(self): """Removes expired job ids to FailedQueue.""" From 1047db0b3af844f1cc5cf62412da7b64fe712aed Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 8 Sep 2014 22:39:48 +0700 Subject: [PATCH 07/16] Renamed WorkingQueue to StartedJobRegistry. --- rq/{working_queue.py => registry.py} | 8 ++--- rq/worker.py | 12 ++++---- tests/test_worker.py | 6 ++-- tests/test_working_queue.py | 46 ++++++++++++++-------------- 4 files changed, 36 insertions(+), 36 deletions(-) rename rq/{working_queue.py => registry.py} (90%) diff --git a/rq/working_queue.py b/rq/registry.py similarity index 90% rename from rq/working_queue.py rename to rq/registry.py index 6dc6826..2bf1445 100644 --- a/rq/working_queue.py +++ b/rq/registry.py @@ -4,10 +4,10 @@ from .queue import FailedQueue from .utils import current_timestamp -class WorkingQueue: +class StartedJobRegistry: """ - Registry of currently executing jobs. Each queue maintains a WorkingQueue. - WorkingQueue contains job keys that are currently being executed. + Registry of currently executing jobs. Each queue maintains a StartedJobRegistry. + StartedJobRegistry contains job keys that are currently being executed. Each key is scored by job's expiration time (datetime started + timeout). Jobs are added to registry right before they are executed and removed @@ -22,7 +22,7 @@ class WorkingQueue: self.connection = resolve_connection(connection) def add(self, job, timeout, pipeline=None): - """Adds a job to WorkingQueue with expiry time of now + timeout.""" + """Adds a job to StartedJobRegistry with expiry time of now + timeout.""" score = current_timestamp() + timeout if pipeline is not None: return pipeline.zadd(self.key, score, job.id) diff --git a/rq/worker.py b/rq/worker.py index 21ddac6..2a98f89 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION -from .working_queue import WorkingQueue +from .registry import StartedJobRegistry try: from procname import setprocname @@ -480,8 +480,8 @@ class Worker(object): self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) - working_queue = WorkingQueue(job.origin, self.connection) - working_queue.add(job, timeout, pipeline=pipeline) + registry = StartedJobRegistry(job.origin, self.connection) + registry.add(job, timeout, pipeline=pipeline) job.set_status(Status.STARTED, pipeline=pipeline) pipeline.execute() @@ -496,7 +496,7 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: - working_queue = WorkingQueue(job.origin, self.connection) + registry = StartedJobRegistry(job.origin, self.connection) try: with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): @@ -514,13 +514,13 @@ class Worker(object): job._status = Status.FINISHED job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) - working_queue.remove(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() except Exception: job.set_status(Status.FAILED, pipeline=pipeline) - working_queue.remove(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() self.handle_exception(job, *sys.exc_info()) diff --git a/tests/test_worker.py b/tests/test_worker.py index 91f95ea..901a890 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -7,7 +7,7 @@ import os from rq import get_failed_queue, Queue, Worker from rq.compat import as_text from rq.job import Job, Status -from rq.working_queue import WorkingQueue +from rq.registry import StartedJobRegistry from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, @@ -287,8 +287,8 @@ class TestWorker(RQTestCase): worker.prepare_job_execution(job) # Updates working queue - working_queue = WorkingQueue(connection=self.testconn) - self.assertEqual(working_queue.get_job_ids(), [job.id]) + registry = StartedJobRegistry(connection=self.testconn) + self.assertEqual(registry.get_job_ids(), [job.id]) # Updates worker statuses self.assertEqual(worker.state, 'busy') diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index 249c035..9830f22 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -5,7 +5,7 @@ from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.working_queue import WorkingQueue +from rq.registry import StartedJobRegistry from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -15,64 +15,64 @@ class TestQueue(RQTestCase): def setUp(self): super(TestQueue, self).setUp() - self.working_queue = WorkingQueue(connection=self.testconn) + self.registry = StartedJobRegistry(connection=self.testconn) def test_add_and_remove(self): - """Adding and removing job to WorkingQueue.""" + """Adding and removing job to StartedJobRegistry.""" timestamp = current_timestamp() job = Job() # Test that job is added with the right score - self.working_queue.add(job, 1000) - self.assertLess(self.testconn.zscore(self.working_queue.key, job.id), + self.registry.add(job, 1000) + self.assertLess(self.testconn.zscore(self.registry.key, job.id), timestamp + 1001) # Ensure that job is properly removed from sorted set - self.working_queue.remove(job) - self.assertIsNone(self.testconn.zscore(self.working_queue.key, job.id)) + self.registry.remove(job) + self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) def test_get_job_ids(self): - """Getting job ids from WorkingQueue.""" - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.testconn.zadd(self.working_queue.key, 10, 'bar') - self.assertEqual(self.working_queue.get_job_ids(), ['foo', 'bar']) + """Getting job ids from StartedJobRegistry.""" + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, 10, 'bar') + self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) def test_get_expired_job_ids(self): - """Getting expired job ids form WorkingQueue.""" + """Getting expired job ids form StartedJobRegistry.""" timestamp = current_timestamp() - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') - self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) + self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) def test_cleanup(self): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.working_queue.cleanup() + self.testconn.zadd(self.registry.key, 1, 'foo') + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) def test_job_execution(self): - """Job is removed from WorkingQueue after execution.""" - working_queue = WorkingQueue(connection=self.testconn) + """Job is removed from StartedJobRegistry after execution.""" + registry = StartedJobRegistry(connection=self.testconn) queue = Queue(connection=self.testconn) worker = Worker([queue]) job = queue.enqueue(say_hello) worker.prepare_job_execution(job) - self.assertIn(job.id, working_queue.get_job_ids()) + self.assertIn(job.id, registry.get_job_ids()) worker.perform_job(job) - self.assertNotIn(job.id, working_queue.get_job_ids()) + self.assertNotIn(job.id, registry.get_job_ids()) # Job that fails job = queue.enqueue(div_by_zero) worker.prepare_job_execution(job) - self.assertIn(job.id, working_queue.get_job_ids()) + self.assertIn(job.id, registry.get_job_ids()) worker.perform_job(job) - self.assertNotIn(job.id, working_queue.get_job_ids()) + self.assertNotIn(job.id, registry.get_job_ids()) From 3dc008d090a5cb814524eddbe09f27829d034fb0 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 29 Jul 2014 18:26:24 +0700 Subject: [PATCH 08/16] Added WorkingQueue class. --- rq/utils.py | 6 +++++ rq/working_queue.py | 38 ++++++++++++++++++++++++++++++++ tests/test_working_queue.py | 44 +++++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+) create mode 100644 rq/working_queue.py create mode 100644 tests/test_working_queue.py diff --git a/rq/utils.py b/rq/utils.py index d875b26..8233ac6 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -8,6 +8,7 @@ terminal colorizing code, originally by Georg Brandl. from __future__ import (absolute_import, division, print_function, unicode_literals) +import calendar import importlib import datetime import logging @@ -202,3 +203,8 @@ def first(iterable, default=None, key=None): return el return default + + +def current_timestamp(): + """Returns current UTC timestamp""" + return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) diff --git a/rq/working_queue.py b/rq/working_queue.py new file mode 100644 index 0000000..3e5da45 --- /dev/null +++ b/rq/working_queue.py @@ -0,0 +1,38 @@ +from .connections import resolve_connection +from .queue import FailedQueue +from .utils import current_timestamp + + +class WorkingQueue: + """ + Registry of currently executing jobs. Each queue maintains a WorkingQueue. + WorkingQueue contains job keys that are currently being executed. + Each key is scored by job's expiration time (datetime started + timeout). + + Jobs are added to registry right before they are executed and removed + right after completion (success or failure). + + Jobs whose score are lower than current time is considered "expired". + """ + + def __init__(self, name, connection=None): + self.name = name + self.key = 'rq:wip:%s' % name + self.connection = resolve_connection(connection) + + def add(self, job, timeout): + """Adds a job to WorkingQueue with expiry time of now + timeout.""" + return self.connection._zadd(self.key, current_timestamp() + timeout, + job.id) + + def remove(self, job): + return self.connection.zrem(self.key, job.id) + + def get_expired_job_ids(self): + """Returns job ids whose score are less than current timestamp.""" + return self.connection.zrangebyscore(self.key, 0, current_timestamp()) + + def get_job_ids(self, start=0, end=-1): + """Returns list of all job ids.""" + return self.connection.zrange(self.key, start, end) + diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py new file mode 100644 index 0000000..9c1a1e5 --- /dev/null +++ b/tests/test_working_queue.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from rq.job import Job +from rq.utils import current_timestamp +from rq.working_queue import WorkingQueue + +from tests import RQTestCase + + +class TestQueue(RQTestCase): + + def setUp(self): + super(TestQueue, self).setUp() + self.working_queue = WorkingQueue('default', connection=self.testconn) + + def test_add_and_remove(self): + """Adding and removing job to WorkingQueue.""" + timestamp = current_timestamp() + job = Job() + + # Test that job is added with the right score + self.working_queue.add(job, 1000) + self.assertLess(self.testconn.zscore(self.working_queue.key, job.id), + timestamp + 1001) + + # Ensure that job is properly removed from sorted set + self.working_queue.remove(job) + self.assertIsNone(self.testconn.zscore(self.working_queue.key, job.id)) + + def test_get_job_ids(self): + """Getting job ids from WorkingQueue.""" + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.testconn.zadd(self.working_queue.key, 10, 'bar') + self.assertEqual(self.working_queue.get_job_ids(), ['foo', 'bar']) + + def test_get_expired_job_ids(self): + """Getting expired job ids form WorkingQueue.""" + timestamp = current_timestamp() + + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') + + self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) \ No newline at end of file From a28575088bf72da4f60d7ae7900a3d2512a4bd24 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 29 Jul 2014 18:37:22 +0700 Subject: [PATCH 09/16] Implemented WorkingQueue.cleanup(). --- rq/queue.py | 5 +++-- rq/working_queue.py | 12 ++++++++++++ tests/test_working_queue.py | 13 ++++++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 86f0406..621942a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -158,9 +158,10 @@ class Queue(object): if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id): + def push_job_id(self, job_id, pipeline=None): """Pushes a job ID on the corresponding Redis queue.""" - self.connection.rpush(self.key, job_id) + connection = pipeline if pipeline is not None else self.connection + connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None): diff --git a/rq/working_queue.py b/rq/working_queue.py index 3e5da45..64cf1e9 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -36,3 +36,15 @@ class WorkingQueue: """Returns list of all job ids.""" return self.connection.zrange(self.key, start, end) + def cleanup(self): + """Removes expired job ids to FailedQueue.""" + job_ids = self.get_expired_job_ids() + + if job_ids: + failed_queue = FailedQueue(connection=self.connection) + with self.connection.pipeline() as pipeline: + for job_id in job_ids: + failed_queue.push_job_id(job_id, pipeline=pipeline) + pipeline.execute() + + return job_ids diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index 9c1a1e5..af6eeb4 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from rq.job import Job +from rq.queue import FailedQueue from rq.utils import current_timestamp from rq.working_queue import WorkingQueue @@ -41,4 +42,14 @@ class TestQueue(RQTestCase): self.testconn.zadd(self.working_queue.key, 1, 'foo') self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') - self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) \ No newline at end of file + self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) + + def test_cleanup(self): + """Moving expired jobs to FailedQueue.""" + failed_queue = FailedQueue(connection=self.testconn) + self.assertTrue(failed_queue.is_empty()) + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.working_queue.cleanup() + self.assertIn('foo', failed_queue.job_ids) + + \ No newline at end of file From b0c0a84ab026e68e587e0ddc7736545daff42acf Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 30 Jul 2014 13:36:35 +0800 Subject: [PATCH 10/16] Moved some logic into worker.prepare_job_execution to make things testable. --- rq/worker.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index be4a955..4348217 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,6 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION +from .working_queue import WorkingQueue try: from procname import setprocname @@ -403,7 +404,7 @@ class Worker(object): self.heartbeat() return result - def heartbeat(self, timeout=0): + def heartbeat(self, timeout=0, pipeline=None): """Specifies a new worker timeout, typically by extending the expiration time of the worker, effectively making this a "heartbeat" to not expire the worker until the timeout passes. @@ -415,7 +416,8 @@ class Worker(object): only larger. """ timeout = max(timeout, self.default_worker_ttl) - self.connection.expire(self.key, timeout) + connection = pipeline if pipeline is not None else self.connection + connection.expire(self.key, timeout) self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within {0} seconds.'.format(timeout)) @@ -468,19 +470,26 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) - def perform_job(self, job): - """Performs the actual work of a job. Will/should only be called - inside the work horse's process. + def prepare_job_execution(self, job): + """Performs misc bookkeeping like updating states prior to + job execution. """ - - self.set_state('busy') - self.set_current_job_id(job.id) - self.heartbeat((job.timeout or 180) + 60) + with self.connection._pipeline() as pipeline: + self.set_state('busy', pipeline=pipeline) + self.set_current_job_id(job.id, pipeline=pipeline) + self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) + pipeline.execute() self.procline('Processing %s from %s since %s' % ( job.func_name, job.origin, time.time())) + def perform_job(self, job): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ + self.prepare_job_execution(job) + with self.connection._pipeline() as pipeline: try: job.set_status(Status.STARTED) From 4d90cc062ea82afbda8f0631db7b9d78eb2f494d Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 30 Jul 2014 16:06:39 +0800 Subject: [PATCH 11/16] Add job to WorkingQueue before execution and remove from WorkingQueue after. --- rq/worker.py | 13 ++++++++++--- rq/working_queue.py | 16 ++++++++++------ tests/test_worker.py | 20 ++++++++++++++++++-- tests/test_working_queue.py | 29 ++++++++++++++++++++++++++--- 4 files changed, 64 insertions(+), 14 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 4348217..3fa0611 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -475,9 +475,12 @@ class Worker(object): job execution. """ with self.connection._pipeline() as pipeline: + timeout = (job.timeout or 180) + 60 self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) - self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) + self.heartbeat(timeout, pipeline=pipeline) + working_queue = WorkingQueue(job.origin, self.connection) + working_queue.add(job, timeout, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -491,13 +494,15 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: + working_queue = WorkingQueue(job.origin, self.connection) + try: job.set_status(Status.STARTED) with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() - # Pickle the result in the same try-except block since we need to - # use the same exc handling when pickling fails + # Pickle the result in the same try-except block since we need + # to use the same exc handling when pickling fails job._result = rv self.set_current_job_id(None, pipeline=pipeline) @@ -508,12 +513,14 @@ class Worker(object): job._status = Status.FINISHED job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) + working_queue.remove(job, pipeline=pipeline) pipeline.execute() except Exception: # Use the public setter here, to immediately update Redis job.set_status(Status.FAILED) + working_queue.remove(job) self.handle_exception(job, *sys.exc_info()) return False diff --git a/rq/working_queue.py b/rq/working_queue.py index 64cf1e9..8959384 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -15,18 +15,22 @@ class WorkingQueue: Jobs whose score are lower than current time is considered "expired". """ - def __init__(self, name, connection=None): + def __init__(self, name='default', connection=None): self.name = name self.key = 'rq:wip:%s' % name self.connection = resolve_connection(connection) - def add(self, job, timeout): + def add(self, job, timeout, pipeline=None): """Adds a job to WorkingQueue with expiry time of now + timeout.""" - return self.connection._zadd(self.key, current_timestamp() + timeout, - job.id) + score = current_timestamp() + timeout + if pipeline is not None: + return pipeline.zadd(self.key, score, job.id) - def remove(self, job): - return self.connection.zrem(self.key, job.id) + return self.connection._zadd(self.key, score, job.id) + + def remove(self, job, pipeline=None): + connection = pipeline if pipeline is not None else self.connection + return connection.zrem(self.key, job.id) def get_expired_job_ids(self): """Returns job ids whose score are less than current timestamp.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 8aef2ac..e51b60f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -7,10 +7,11 @@ import os from rq import get_failed_queue, Queue, Worker, SimpleWorker from rq.compat import as_text from rq.job import Job, Status +from rq.working_queue import WorkingQueue from tests import RQTestCase, slow -from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, - say_hello, say_pid) +from tests.fixtures import (create_file, create_file_after_timeout, + div_by_zero, say_hello, say_pid) from tests.helpers import strip_microseconds @@ -291,3 +292,18 @@ class TestWorker(RQTestCase): 'Expected at least some work done.') self.assertEquals(job.result, os.getpid(), 'PID mismatch, fork() is not supposed to happen here') + + def test_prepare_job_execution(self): + """Prepare job execution does the necessary bookkeeping.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + worker = Worker([queue]) + worker.prepare_job_execution(job) + + # Updates working queue + working_queue = WorkingQueue(connection=self.testconn) + self.assertEqual(working_queue.get_job_ids(), [job.id]) + + # Updates worker statuses + self.assertEqual(worker.state, 'busy') + self.assertEqual(worker.get_current_job_id(), job.id) diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index af6eeb4..249c035 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -2,18 +2,20 @@ from __future__ import absolute_import from rq.job import Job -from rq.queue import FailedQueue +from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp +from rq.worker import Worker from rq.working_queue import WorkingQueue from tests import RQTestCase +from tests.fixtures import div_by_zero, say_hello class TestQueue(RQTestCase): def setUp(self): super(TestQueue, self).setUp() - self.working_queue = WorkingQueue('default', connection=self.testconn) + self.working_queue = WorkingQueue(connection=self.testconn) def test_add_and_remove(self): """Adding and removing job to WorkingQueue.""" @@ -52,4 +54,25 @@ class TestQueue(RQTestCase): self.working_queue.cleanup() self.assertIn('foo', failed_queue.job_ids) - \ No newline at end of file + def test_job_execution(self): + """Job is removed from WorkingQueue after execution.""" + working_queue = WorkingQueue(connection=self.testconn) + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + + job = queue.enqueue(say_hello) + + worker.prepare_job_execution(job) + self.assertIn(job.id, working_queue.get_job_ids()) + + worker.perform_job(job) + self.assertNotIn(job.id, working_queue.get_job_ids()) + + # Job that fails + job = queue.enqueue(div_by_zero) + + worker.prepare_job_execution(job) + self.assertIn(job.id, working_queue.get_job_ids()) + + worker.perform_job(job) + self.assertNotIn(job.id, working_queue.get_job_ids()) From 60c7a3cc6e90bf167f8c98e3232793718846918f Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 7 Sep 2014 17:03:17 +0700 Subject: [PATCH 12/16] working_queue.remove call should be pipelined. --- rq/job.py | 5 +++-- rq/worker.py | 12 +++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/rq/job.py b/rq/job.py index ef7a266..bd91f13 100644 --- a/rq/job.py +++ b/rq/job.py @@ -147,9 +147,10 @@ class Job(object): ) return self.get_status() - def set_status(self, status): + def set_status(self, status, pipeline=None): self._status = status - self.connection.hset(self.key, 'status', self._status) + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'status', self._status) def _set_status(self, status): warnings.warn( diff --git a/rq/worker.py b/rq/worker.py index 3fa0611..011731e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -474,13 +474,15 @@ class Worker(object): """Performs misc bookkeeping like updating states prior to job execution. """ + timeout = (job.timeout or 180) + 60 + with self.connection._pipeline() as pipeline: - timeout = (job.timeout or 180) + 60 self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) working_queue = WorkingQueue(job.origin, self.connection) working_queue.add(job, timeout, pipeline=pipeline) + job.set_status(Status.STARTED, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -497,7 +499,6 @@ class Worker(object): working_queue = WorkingQueue(job.origin, self.connection) try: - job.set_status(Status.STARTED) with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() @@ -518,9 +519,10 @@ class Worker(object): pipeline.execute() except Exception: - # Use the public setter here, to immediately update Redis - job.set_status(Status.FAILED) - working_queue.remove(job) + job.set_status(Status.FAILED, pipeline=pipeline) + working_queue.remove(job, pipeline=pipeline) + pipeline.execute() + self.handle_exception(job, *sys.exc_info()) return False From 2e96148b31dee34cb7c259db772613fd18aff8be Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 8 Sep 2014 22:35:18 +0700 Subject: [PATCH 13/16] Fixed Python 3 tests for "WorkingQueue". --- rq/working_queue.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rq/working_queue.py b/rq/working_queue.py index 8959384..6dc6826 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -1,3 +1,4 @@ +from .compat import as_text from .connections import resolve_connection from .queue import FailedQueue from .utils import current_timestamp @@ -34,11 +35,13 @@ class WorkingQueue: def get_expired_job_ids(self): """Returns job ids whose score are less than current timestamp.""" - return self.connection.zrangebyscore(self.key, 0, current_timestamp()) + return [as_text(job_id) for job_id in + self.connection.zrangebyscore(self.key, 0, current_timestamp())] def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" - return self.connection.zrange(self.key, start, end) + return [as_text(job_id) for job_id in + self.connection.zrange(self.key, start, end)] def cleanup(self): """Removes expired job ids to FailedQueue.""" From 9341a4a33d53e59a29d2f90bee17b41a2b408830 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 8 Sep 2014 22:39:48 +0700 Subject: [PATCH 14/16] Renamed WorkingQueue to StartedJobRegistry. --- rq/{working_queue.py => registry.py} | 8 ++--- rq/worker.py | 12 ++++---- tests/test_worker.py | 6 ++-- tests/test_working_queue.py | 46 ++++++++++++++-------------- 4 files changed, 36 insertions(+), 36 deletions(-) rename rq/{working_queue.py => registry.py} (90%) diff --git a/rq/working_queue.py b/rq/registry.py similarity index 90% rename from rq/working_queue.py rename to rq/registry.py index 6dc6826..2bf1445 100644 --- a/rq/working_queue.py +++ b/rq/registry.py @@ -4,10 +4,10 @@ from .queue import FailedQueue from .utils import current_timestamp -class WorkingQueue: +class StartedJobRegistry: """ - Registry of currently executing jobs. Each queue maintains a WorkingQueue. - WorkingQueue contains job keys that are currently being executed. + Registry of currently executing jobs. Each queue maintains a StartedJobRegistry. + StartedJobRegistry contains job keys that are currently being executed. Each key is scored by job's expiration time (datetime started + timeout). Jobs are added to registry right before they are executed and removed @@ -22,7 +22,7 @@ class WorkingQueue: self.connection = resolve_connection(connection) def add(self, job, timeout, pipeline=None): - """Adds a job to WorkingQueue with expiry time of now + timeout.""" + """Adds a job to StartedJobRegistry with expiry time of now + timeout.""" score = current_timestamp() + timeout if pipeline is not None: return pipeline.zadd(self.key, score, job.id) diff --git a/rq/worker.py b/rq/worker.py index 011731e..c9fabbd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION -from .working_queue import WorkingQueue +from .registry import StartedJobRegistry try: from procname import setprocname @@ -480,8 +480,8 @@ class Worker(object): self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) - working_queue = WorkingQueue(job.origin, self.connection) - working_queue.add(job, timeout, pipeline=pipeline) + registry = StartedJobRegistry(job.origin, self.connection) + registry.add(job, timeout, pipeline=pipeline) job.set_status(Status.STARTED, pipeline=pipeline) pipeline.execute() @@ -496,7 +496,7 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: - working_queue = WorkingQueue(job.origin, self.connection) + registry = StartedJobRegistry(job.origin, self.connection) try: with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): @@ -514,13 +514,13 @@ class Worker(object): job._status = Status.FINISHED job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) - working_queue.remove(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() except Exception: job.set_status(Status.FAILED, pipeline=pipeline) - working_queue.remove(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() self.handle_exception(job, *sys.exc_info()) diff --git a/tests/test_worker.py b/tests/test_worker.py index e51b60f..1a0bf1c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -7,7 +7,7 @@ import os from rq import get_failed_queue, Queue, Worker, SimpleWorker from rq.compat import as_text from rq.job import Job, Status -from rq.working_queue import WorkingQueue +from rq.registry import StartedJobRegistry from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, @@ -301,8 +301,8 @@ class TestWorker(RQTestCase): worker.prepare_job_execution(job) # Updates working queue - working_queue = WorkingQueue(connection=self.testconn) - self.assertEqual(working_queue.get_job_ids(), [job.id]) + registry = StartedJobRegistry(connection=self.testconn) + self.assertEqual(registry.get_job_ids(), [job.id]) # Updates worker statuses self.assertEqual(worker.state, 'busy') diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index 249c035..9830f22 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -5,7 +5,7 @@ from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.working_queue import WorkingQueue +from rq.registry import StartedJobRegistry from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -15,64 +15,64 @@ class TestQueue(RQTestCase): def setUp(self): super(TestQueue, self).setUp() - self.working_queue = WorkingQueue(connection=self.testconn) + self.registry = StartedJobRegistry(connection=self.testconn) def test_add_and_remove(self): - """Adding and removing job to WorkingQueue.""" + """Adding and removing job to StartedJobRegistry.""" timestamp = current_timestamp() job = Job() # Test that job is added with the right score - self.working_queue.add(job, 1000) - self.assertLess(self.testconn.zscore(self.working_queue.key, job.id), + self.registry.add(job, 1000) + self.assertLess(self.testconn.zscore(self.registry.key, job.id), timestamp + 1001) # Ensure that job is properly removed from sorted set - self.working_queue.remove(job) - self.assertIsNone(self.testconn.zscore(self.working_queue.key, job.id)) + self.registry.remove(job) + self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) def test_get_job_ids(self): - """Getting job ids from WorkingQueue.""" - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.testconn.zadd(self.working_queue.key, 10, 'bar') - self.assertEqual(self.working_queue.get_job_ids(), ['foo', 'bar']) + """Getting job ids from StartedJobRegistry.""" + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, 10, 'bar') + self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) def test_get_expired_job_ids(self): - """Getting expired job ids form WorkingQueue.""" + """Getting expired job ids form StartedJobRegistry.""" timestamp = current_timestamp() - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') - self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) + self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) def test_cleanup(self): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.working_queue.cleanup() + self.testconn.zadd(self.registry.key, 1, 'foo') + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) def test_job_execution(self): - """Job is removed from WorkingQueue after execution.""" - working_queue = WorkingQueue(connection=self.testconn) + """Job is removed from StartedJobRegistry after execution.""" + registry = StartedJobRegistry(connection=self.testconn) queue = Queue(connection=self.testconn) worker = Worker([queue]) job = queue.enqueue(say_hello) worker.prepare_job_execution(job) - self.assertIn(job.id, working_queue.get_job_ids()) + self.assertIn(job.id, registry.get_job_ids()) worker.perform_job(job) - self.assertNotIn(job.id, working_queue.get_job_ids()) + self.assertNotIn(job.id, registry.get_job_ids()) # Job that fails job = queue.enqueue(div_by_zero) worker.prepare_job_execution(job) - self.assertIn(job.id, working_queue.get_job_ids()) + self.assertIn(job.id, registry.get_job_ids()) worker.perform_job(job) - self.assertNotIn(job.id, working_queue.get_job_ids()) + self.assertNotIn(job.id, registry.get_job_ids()) From dc12f8aee5b725bb4fcb16ffa0e7d9c7534008c9 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 8 Sep 2014 22:54:04 +0700 Subject: [PATCH 15/16] Fixed random registry failures when run on slower machines. --- tests/{test_working_queue.py => test_job_started_registry.py} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename tests/{test_working_queue.py => test_job_started_registry.py} (98%) diff --git a/tests/test_working_queue.py b/tests/test_job_started_registry.py similarity index 98% rename from tests/test_working_queue.py rename to tests/test_job_started_registry.py index 9830f22..e8241b5 100644 --- a/tests/test_working_queue.py +++ b/tests/test_job_started_registry.py @@ -25,7 +25,7 @@ class TestQueue(RQTestCase): # Test that job is added with the right score self.registry.add(job, 1000) self.assertLess(self.testconn.zscore(self.registry.key, job.id), - timestamp + 1001) + timestamp + 1002) # Ensure that job is properly removed from sorted set self.registry.remove(job) From 6d79082b62ac5b2af48d2a91618e31c07ca3768c Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 8 Sep 2014 23:04:18 +0700 Subject: [PATCH 16/16] Call move_expired_jobs_to_failed_queue before returning job_ids. --- rq/registry.py | 5 +++-- tests/test_job_started_registry.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index 2bf1445..afa7b5b 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -40,11 +40,12 @@ class StartedJobRegistry: def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" + self.move_expired_jobs_to_failed_queue() return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)] - def cleanup(self): - """Removes expired job ids to FailedQueue.""" + def move_expired_jobs_to_failed_queue(self): + """Remove expired jobs from registry and add them to FailedQueue.""" job_ids = self.get_expired_job_ids() if job_ids: diff --git a/tests/test_job_started_registry.py b/tests/test_job_started_registry.py index e8241b5..addb1db 100644 --- a/tests/test_job_started_registry.py +++ b/tests/test_job_started_registry.py @@ -51,7 +51,7 @@ class TestQueue(RQTestCase): failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) self.testconn.zadd(self.registry.key, 1, 'foo') - self.registry.cleanup() + self.registry.move_expired_jobs_to_failed_queue() self.assertIn('foo', failed_queue.job_ids) def test_job_execution(self):