Merge pull request #529 from selwin/registry-cleanup-bug

Jobs from FinishedJobRegistry should have "failed" as status when moved to FailedQueue
main
Selwin Ong 10 years ago
commit 719243dbad

@ -1,5 +1,7 @@
from .compat import as_text from .compat import as_text
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .job import Job, JobStatus
from .queue import FailedQueue from .queue import FailedQueue
from .utils import current_timestamp from .utils import current_timestamp
@ -80,9 +82,17 @@ class StartedJobRegistry(BaseRegistry):
if job_ids: if job_ids:
failed_queue = FailedQueue(connection=self.connection) failed_queue = FailedQueue(connection=self.connection)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for job_id in job_ids: for job_id in job_ids:
try:
job = Job.fetch(job_id, connection=self.connection)
job.status = JobStatus.FAILED
job.save(pipeline=pipeline)
failed_queue.push_job_id(job_id, pipeline=pipeline) failed_queue.push_job_id(job_id, pipeline=pipeline)
except NoSuchJobError:
pass
pipeline.zremrangebyscore(self.key, 0, score) pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute() pipeline.execute()

@ -2,7 +2,7 @@
from __future__ import absolute_import from __future__ import absolute_import
from rq.compat import as_text from rq.compat import as_text
from rq.job import Job from rq.job import Job, JobStatus
from rq.queue import FailedQueue, Queue from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker from rq.worker import Worker
@ -60,15 +60,21 @@ class TestRegistry(RQTestCase):
"""Moving expired jobs to FailedQueue.""" """Moving expired jobs to FailedQueue."""
failed_queue = FailedQueue(connection=self.testconn) failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty()) self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.registry.key, 2, 'foo')
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
self.testconn.zadd(self.registry.key, 2, job.id)
self.registry.cleanup(1) self.registry.cleanup(1)
self.assertNotIn('foo', failed_queue.job_ids) self.assertNotIn(job.id, failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2) self.assertEqual(self.testconn.zscore(self.registry.key, job.id), 2)
self.registry.cleanup() self.registry.cleanup()
self.assertIn('foo', failed_queue.job_ids) self.assertIn(job.id, failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None)
job.refresh()
self.assertEqual(job.status, JobStatus.FAILED)
def test_job_execution(self): def test_job_execution(self):
"""Job is removed from StartedJobRegistry after execution.""" """Job is removed from StartedJobRegistry after execution."""

Loading…
Cancel
Save