diff --git a/rq/working_queue.py b/rq/registry.py similarity index 90% rename from rq/working_queue.py rename to rq/registry.py index 6dc6826..2bf1445 100644 --- a/rq/working_queue.py +++ b/rq/registry.py @@ -4,10 +4,10 @@ from .queue import FailedQueue from .utils import current_timestamp -class WorkingQueue: +class StartedJobRegistry: """ - Registry of currently executing jobs. Each queue maintains a WorkingQueue. - WorkingQueue contains job keys that are currently being executed. + Registry of currently executing jobs. Each queue maintains a StartedJobRegistry. + StartedJobRegistry contains job keys that are currently being executed. Each key is scored by job's expiration time (datetime started + timeout). Jobs are added to registry right before they are executed and removed @@ -22,7 +22,7 @@ class WorkingQueue: self.connection = resolve_connection(connection) def add(self, job, timeout, pipeline=None): - """Adds a job to WorkingQueue with expiry time of now + timeout.""" + """Adds a job to StartedJobRegistry with expiry time of now + timeout.""" score = current_timestamp() + timeout if pipeline is not None: return pipeline.zadd(self.key, score, job.id) diff --git a/rq/worker.py b/rq/worker.py index 21ddac6..2a98f89 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION -from .working_queue import WorkingQueue +from .registry import StartedJobRegistry try: from procname import setprocname @@ -480,8 +480,8 @@ class Worker(object): self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) - working_queue = WorkingQueue(job.origin, self.connection) - working_queue.add(job, timeout, pipeline=pipeline) + registry = StartedJobRegistry(job.origin, self.connection) + registry.add(job, timeout, pipeline=pipeline) job.set_status(Status.STARTED, pipeline=pipeline) pipeline.execute() @@ -496,7 +496,7 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: - working_queue = WorkingQueue(job.origin, self.connection) + registry = StartedJobRegistry(job.origin, self.connection) try: with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): @@ -514,13 +514,13 @@ class Worker(object): job._status = Status.FINISHED job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) - working_queue.remove(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() except Exception: job.set_status(Status.FAILED, pipeline=pipeline) - working_queue.remove(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() self.handle_exception(job, *sys.exc_info()) diff --git a/tests/test_worker.py b/tests/test_worker.py index 91f95ea..901a890 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -7,7 +7,7 @@ import os from rq import get_failed_queue, Queue, Worker from rq.compat import as_text from rq.job import Job, Status -from rq.working_queue import WorkingQueue +from rq.registry import StartedJobRegistry from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, @@ -287,8 +287,8 @@ class TestWorker(RQTestCase): worker.prepare_job_execution(job) # Updates working queue - working_queue = WorkingQueue(connection=self.testconn) - self.assertEqual(working_queue.get_job_ids(), [job.id]) + registry = StartedJobRegistry(connection=self.testconn) + self.assertEqual(registry.get_job_ids(), [job.id]) # Updates worker statuses self.assertEqual(worker.state, 'busy') diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index 249c035..9830f22 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -5,7 +5,7 @@ from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.working_queue import WorkingQueue +from rq.registry import StartedJobRegistry from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -15,64 +15,64 @@ class TestQueue(RQTestCase): def setUp(self): super(TestQueue, self).setUp() - self.working_queue = WorkingQueue(connection=self.testconn) + self.registry = StartedJobRegistry(connection=self.testconn) def test_add_and_remove(self): - """Adding and removing job to WorkingQueue.""" + """Adding and removing job to StartedJobRegistry.""" timestamp = current_timestamp() job = Job() # Test that job is added with the right score - self.working_queue.add(job, 1000) - self.assertLess(self.testconn.zscore(self.working_queue.key, job.id), + self.registry.add(job, 1000) + self.assertLess(self.testconn.zscore(self.registry.key, job.id), timestamp + 1001) # Ensure that job is properly removed from sorted set - self.working_queue.remove(job) - self.assertIsNone(self.testconn.zscore(self.working_queue.key, job.id)) + self.registry.remove(job) + self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) def test_get_job_ids(self): - """Getting job ids from WorkingQueue.""" - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.testconn.zadd(self.working_queue.key, 10, 'bar') - self.assertEqual(self.working_queue.get_job_ids(), ['foo', 'bar']) + """Getting job ids from StartedJobRegistry.""" + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, 10, 'bar') + self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) def test_get_expired_job_ids(self): - """Getting expired job ids form WorkingQueue.""" + """Getting expired job ids form StartedJobRegistry.""" timestamp = current_timestamp() - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') - self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) + self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) def test_cleanup(self): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.working_queue.key, 1, 'foo') - self.working_queue.cleanup() + self.testconn.zadd(self.registry.key, 1, 'foo') + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) def test_job_execution(self): - """Job is removed from WorkingQueue after execution.""" - working_queue = WorkingQueue(connection=self.testconn) + """Job is removed from StartedJobRegistry after execution.""" + registry = StartedJobRegistry(connection=self.testconn) queue = Queue(connection=self.testconn) worker = Worker([queue]) job = queue.enqueue(say_hello) worker.prepare_job_execution(job) - self.assertIn(job.id, working_queue.get_job_ids()) + self.assertIn(job.id, registry.get_job_ids()) worker.perform_job(job) - self.assertNotIn(job.id, working_queue.get_job_ids()) + self.assertNotIn(job.id, registry.get_job_ids()) # Job that fails job = queue.enqueue(div_by_zero) worker.prepare_job_execution(job) - self.assertIn(job.id, working_queue.get_job_ids()) + self.assertIn(job.id, registry.get_job_ids()) worker.perform_job(job) - self.assertNotIn(job.id, working_queue.get_job_ids()) + self.assertNotIn(job.id, registry.get_job_ids())