diff --git a/rq/registry.py b/rq/registry.py index 59becfe..c59bf88 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -80,3 +80,18 @@ class StartedJobRegistry(BaseRegistry): pipeline.execute() return job_ids + + +class FinishedJobRegistry(BaseRegistry): + """ + Registry of jobs that have been completed. Jobs are added to this + registry after they have successfully completed for monitoring purposes. + """ + + def __init__(self, name='default', connection=None): + super(FinishedJobRegistry, self).__init__(name, connection) + self.key = 'rq:finished:%s' % name + + def cleanup(self): + """Remove expired jobs from registry.""" + self.connection.zremrangebyscore(self.key, 0, current_timestamp()) diff --git a/rq/worker.py b/rq/worker.py index fbf981d..48e22a9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION -from .registry import StartedJobRegistry +from .registry import FinishedJobRegistry, StartedJobRegistry try: from procname import setprocname @@ -496,7 +496,7 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: - registry = StartedJobRegistry(job.origin, self.connection) + started_job_registry = StartedJobRegistry(job.origin) try: with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): @@ -513,14 +513,18 @@ class Worker(object): job.ended_at = utcnow() job._status = Status.FINISHED job.save(pipeline=pipeline) + + finished_job_registry = FinishedJobRegistry(job.origin) + finished_job_registry.add(job, result_ttl, pipeline) + job.cleanup(result_ttl, pipeline=pipeline) - registry.remove(job, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() except Exception: job.set_status(Status.FAILED, pipeline=pipeline) - registry.remove(job, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() self.handle_exception(job, *sys.exc_info()) return False diff --git a/tests/test_job_started_registry.py b/tests/test_registry.py similarity index 70% rename from tests/test_job_started_registry.py rename to tests/test_registry.py index eba1c36..ce4a345 100644 --- a/tests/test_job_started_registry.py +++ b/tests/test_registry.py @@ -5,16 +5,16 @@ from rq.job import Job from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import StartedJobRegistry +from rq.registry import FinishedJobRegistry, StartedJobRegistry from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello -class TestQueue(RQTestCase): +class TestRegistry(RQTestCase): def setUp(self): - super(TestQueue, self).setUp() + super(TestRegistry, self).setUp() self.registry = StartedJobRegistry(connection=self.testconn) def test_add_and_remove(self): @@ -86,3 +86,35 @@ class TestQueue(RQTestCase): self.testconn.zadd(self.registry.key, timestamp, 'bar') self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) + + +class TestFinishedJobRegistry(RQTestCase): + + def setUp(self): + super(TestFinishedJobRegistry, self).setUp() + self.registry = FinishedJobRegistry(connection=self.testconn) + + def test_cleanup(self): + """Finished job registry removes expired jobs.""" + timestamp = current_timestamp() + self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + + self.registry.cleanup() + self.assertEqual(self.registry.get_job_ids(), ['bar']) + + def test_jobs_are_put_in_registry(self): + """Completed jobs are added to FinishedJobRegistry.""" + self.assertEqual(self.registry.get_job_ids(), []) + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + + # Completed jobs are put in FinishedJobRegistry + job = queue.enqueue(say_hello) + worker.perform_job(job) + self.assertEqual(self.registry.get_job_ids(), [job.id]) + + # Failed jobs are not put in FinishedJobRegistry + failed_job = queue.enqueue(div_by_zero) + worker.perform_job(failed_job) + self.assertEqual(self.registry.get_job_ids(), [job.id])