Allow maximum job age to be specified when cleaning up or getting expired jobs from a registry.

This ensures that all jobs cleaned from a started registry end up in the failed queue.
main
Nic Cope 10 years ago
parent 6ab7070a92
commit 09cab7a90d

@ -8,9 +8,6 @@ class BaseRegistry(object):
""" """
Base implementation of job registry, implemented in Redis sorted set. Each job Base implementation of job registry, implemented in Redis sorted set. Each job
is stored as a key in the registry, scored by expiration time (unix timestamp). is stored as a key in the registry, scored by expiration time (unix timestamp).
Jobs with scores are lower than current time is considered "expired" and
should be cleaned up.
""" """
def __init__(self, name='default', connection=None): def __init__(self, name='default', connection=None):
@ -39,10 +36,16 @@ class BaseRegistry(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
return connection.zrem(self.key, job.id) return connection.zrem(self.key, job.id)
def get_expired_job_ids(self): def get_expired_job_ids(self, timestamp=None):
"""Returns job ids whose score are less than current timestamp.""" """Returns job ids whose score are less than current timestamp.
Returns ids for jobs with an expiry time earlier than timestamp,
specified as seconds since the Unix epoch. timestamp defaults to call
time if unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, current_timestamp())] self.connection.zrangebyscore(self.key, 0, score)]
def get_job_ids(self, start=0, end=-1): def get_job_ids(self, start=0, end=-1):
"""Returns list of all job ids.""" """Returns list of all job ids."""
@ -59,24 +62,28 @@ class StartedJobRegistry(BaseRegistry):
Jobs are added to registry right before they are executed and removed Jobs are added to registry right before they are executed and removed
right after completion (success or failure). right after completion (success or failure).
Jobs whose score are lower than current time is considered "expired".
""" """
def __init__(self, name='default', connection=None): def __init__(self, name='default', connection=None):
super(StartedJobRegistry, self).__init__(name, connection) super(StartedJobRegistry, self).__init__(name, connection)
self.key = 'rq:wip:%s' % name self.key = 'rq:wip:%s' % name
def cleanup(self): def cleanup(self, timestamp=None):
"""Remove expired jobs from registry and add them to FailedQueue.""" """Remove expired jobs from registry and add them to FailedQueue.
job_ids = self.get_expired_job_ids()
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified. Removed jobs are added to the global failed job queue.
"""
score = timestamp if timestamp is not None else current_timestamp()
job_ids = self.get_expired_job_ids(score)
if job_ids: if job_ids:
failed_queue = FailedQueue(connection=self.connection) failed_queue = FailedQueue(connection=self.connection)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for job_id in job_ids: for job_id in job_ids:
failed_queue.push_job_id(job_id, pipeline=pipeline) failed_queue.push_job_id(job_id, pipeline=pipeline)
pipeline.zremrangebyscore(self.key, 0, current_timestamp()) pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute() pipeline.execute()
return job_ids return job_ids
@ -92,6 +99,12 @@ class FinishedJobRegistry(BaseRegistry):
super(FinishedJobRegistry, self).__init__(name, connection) super(FinishedJobRegistry, self).__init__(name, connection)
self.key = 'rq:finished:%s' % name self.key = 'rq:finished:%s' % name
def cleanup(self): def cleanup(self, timestamp=None):
"""Remove expired jobs from registry.""" """Remove expired jobs from registry.
self.connection.zremrangebyscore(self.key, 0, current_timestamp())
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)

@ -48,14 +48,22 @@ class TestRegistry(RQTestCase):
self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) self.assertEqual(self.registry.get_expired_job_ids(), ['foo'])
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
['foo', 'bar'])
def test_cleanup(self): def test_cleanup(self):
"""Moving expired jobs to FailedQueue.""" """Moving expired jobs to FailedQueue."""
failed_queue = FailedQueue(connection=self.testconn) failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty()) self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, 2, 'foo')
self.registry.cleanup(1)
self.assertNotIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2)
self.registry.cleanup() self.registry.cleanup()
self.assertIn('foo', failed_queue.job_ids) self.assertIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None)
@ -103,9 +111,14 @@ class TestFinishedJobRegistry(RQTestCase):
timestamp = current_timestamp() timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.registry.cleanup() self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar']) self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz'])
self.registry.cleanup(timestamp + 20)
self.assertEqual(self.registry.get_job_ids(), ['baz'])
def test_jobs_are_put_in_registry(self): def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry.""" """Completed jobs are added to FinishedJobRegistry."""

Loading…
Cancel
Save