diff --git a/rq/registry.py b/rq/registry.py index a1b89b2..b4cf43f 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -8,9 +8,6 @@ class BaseRegistry(object): """ Base implementation of job registry, implemented in Redis sorted set. Each job is stored as a key in the registry, scored by expiration time (unix timestamp). - - Jobs with scores are lower than current time is considered "expired" and - should be cleaned up. """ def __init__(self, name='default', connection=None): @@ -27,9 +24,9 @@ class BaseRegistry(object): self.cleanup() return self.connection.zcard(self.key) - def add(self, job, timeout, pipeline=None): - """Adds a job to a registry with expiry time of now + timeout.""" - score = timeout if timeout == -1 else current_timestamp() + timeout + def add(self, job, ttl, pipeline=None): + """Adds a job to a registry with expiry time of now + ttl.""" + score = ttl if ttl < 0 else current_timestamp() + ttl if pipeline is not None: return pipeline.zadd(self.key, score, job.id) @@ -39,10 +36,16 @@ class BaseRegistry(object): connection = pipeline if pipeline is not None else self.connection return connection.zrem(self.key, job.id) - def get_expired_job_ids(self): - """Returns job ids whose score are less than current timestamp.""" + def get_expired_job_ids(self, timestamp=None): + """Returns job ids whose score are less than current timestamp. + + Returns ids for jobs with an expiry time earlier than timestamp, + specified as seconds since the Unix epoch. timestamp defaults to call + time if unspecified. + """ + score = timestamp if timestamp is not None else current_timestamp() return [as_text(job_id) for job_id in - self.connection.zrangebyscore(self.key, 0, current_timestamp())] + self.connection.zrangebyscore(self.key, 0, score)] def get_job_ids(self, start=0, end=-1): """Returns list of all job ids.""" @@ -59,24 +62,28 @@ class StartedJobRegistry(BaseRegistry): Jobs are added to registry right before they are executed and removed right after completion (success or failure). - - Jobs whose score are lower than current time is considered "expired". """ def __init__(self, name='default', connection=None): super(StartedJobRegistry, self).__init__(name, connection) self.key = 'rq:wip:%s' % name - def cleanup(self): - """Remove expired jobs from registry and add them to FailedQueue.""" - job_ids = self.get_expired_job_ids() + def cleanup(self, timestamp=None): + """Remove expired jobs from registry and add them to FailedQueue. + + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. Removed jobs are added to the global failed job queue. + """ + score = timestamp if timestamp is not None else current_timestamp() + job_ids = self.get_expired_job_ids(score) if job_ids: failed_queue = FailedQueue(connection=self.connection) with self.connection.pipeline() as pipeline: for job_id in job_ids: failed_queue.push_job_id(job_id, pipeline=pipeline) - pipeline.zremrangebyscore(self.key, 0, current_timestamp()) + pipeline.zremrangebyscore(self.key, 0, score) pipeline.execute() return job_ids @@ -92,6 +99,12 @@ class FinishedJobRegistry(BaseRegistry): super(FinishedJobRegistry, self).__init__(name, connection) self.key = 'rq:finished:%s' % name - def cleanup(self): - """Remove expired jobs from registry.""" - self.connection.zremrangebyscore(self.key, 0, current_timestamp()) + def cleanup(self, timestamp=None): + """Remove expired jobs from registry. + + Removes jobs with an expiry time earlier than timestamp, specified as + seconds since the Unix epoch. timestamp defaults to call time if + unspecified. + """ + score = timestamp if timestamp is not None else current_timestamp() + self.connection.zremrangebyscore(self.key, 0, score) diff --git a/tests/test_registry.py b/tests/test_registry.py index 36b9792..26470e3 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -48,14 +48,22 @@ class TestRegistry(RQTestCase): self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) + self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), + ['foo', 'bar']) def test_cleanup(self): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.registry.key, 1, 'foo') + self.testconn.zadd(self.registry.key, 2, 'foo') + + self.registry.cleanup(1) + self.assertNotIn('foo', failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2) + self.registry.cleanup() self.assertIn('foo', failed_queue.job_ids) self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) @@ -103,9 +111,14 @@ class TestFinishedJobRegistry(RQTestCase): timestamp = current_timestamp() self.testconn.zadd(self.registry.key, 1, 'foo') self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') + self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') self.registry.cleanup() - self.assertEqual(self.registry.get_job_ids(), ['bar']) + self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz']) + + self.registry.cleanup(timestamp + 20) + self.assertEqual(self.registry.get_job_ids(), ['baz']) + def test_jobs_are_put_in_registry(self): """Completed jobs are added to FinishedJobRegistry."""