Added FinishedJobRegistry to monitor finished jobs.

main
Selwin Ong 10 years ago
parent 74a9982ecb
commit 2307bc8253

@ -80,3 +80,18 @@ class StartedJobRegistry(BaseRegistry):
pipeline.execute()
return job_ids
class FinishedJobRegistry(BaseRegistry):
"""
Registry of jobs that have been completed. Jobs are added to this
registry after they have successfully completed for monitoring purposes.
"""
def __init__(self, name='default', connection=None):
super(FinishedJobRegistry, self).__init__(name, connection)
self.key = 'rq:finished:%s' % name
def cleanup(self):
"""Remove expired jobs from registry."""
self.connection.zremrangebyscore(self.key, 0, current_timestamp())

@ -23,7 +23,7 @@ from .queue import get_failed_queue, Queue
from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow
from .version import VERSION
from .registry import StartedJobRegistry
from .registry import FinishedJobRegistry, StartedJobRegistry
try:
from procname import setprocname
@ -496,7 +496,7 @@ class Worker(object):
self.prepare_job_execution(job)
with self.connection._pipeline() as pipeline:
registry = StartedJobRegistry(job.origin, self.connection)
started_job_registry = StartedJobRegistry(job.origin)
try:
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
@ -513,14 +513,18 @@ class Worker(object):
job.ended_at = utcnow()
job._status = Status.FINISHED
job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin)
finished_job_registry.add(job, result_ttl, pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
registry.remove(job, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
except Exception:
job.set_status(Status.FAILED, pipeline=pipeline)
registry.remove(job, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
self.handle_exception(job, *sys.exc_info())
return False

@ -5,16 +5,16 @@ from rq.job import Job
from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.registry import StartedJobRegistry
from rq.registry import FinishedJobRegistry, StartedJobRegistry
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
class TestQueue(RQTestCase):
class TestRegistry(RQTestCase):
def setUp(self):
super(TestQueue, self).setUp()
super(TestRegistry, self).setUp()
self.registry = StartedJobRegistry(connection=self.testconn)
def test_add_and_remove(self):
@ -86,3 +86,35 @@ class TestQueue(RQTestCase):
self.testconn.zadd(self.registry.key, timestamp, 'bar')
self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2)
class TestFinishedJobRegistry(RQTestCase):
def setUp(self):
super(TestFinishedJobRegistry, self).setUp()
self.registry = FinishedJobRegistry(connection=self.testconn)
def test_cleanup(self):
"""Finished job registry removes expired jobs."""
timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar'])
def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry."""
self.assertEqual(self.registry.get_job_ids(), [])
queue = Queue(connection=self.testconn)
worker = Worker([queue])
# Completed jobs are put in FinishedJobRegistry
job = queue.enqueue(say_hello)
worker.perform_job(job)
self.assertEqual(self.registry.get_job_ids(), [job.id])
# Failed jobs are not put in FinishedJobRegistry
failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job)
self.assertEqual(self.registry.get_job_ids(), [job.id])
Loading…
Cancel
Save