diff --git a/rq/registry.py b/rq/registry.py index 08798eb..1180d4f 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,5 +1,7 @@ from .compat import as_text from .connections import resolve_connection +from .exceptions import NoSuchJobError +from .job import Job, JobStatus from .queue import FailedQueue from .utils import current_timestamp @@ -80,9 +82,17 @@ class StartedJobRegistry(BaseRegistry): if job_ids: failed_queue = FailedQueue(connection=self.connection) + with self.connection.pipeline() as pipeline: for job_id in job_ids: - failed_queue.push_job_id(job_id, pipeline=pipeline) + try: + job = Job.fetch(job_id, connection=self.connection) + job.status = JobStatus.FAILED + job.save(pipeline=pipeline) + failed_queue.push_job_id(job_id, pipeline=pipeline) + except NoSuchJobError: + pass + pipeline.zremrangebyscore(self.key, 0, score) pipeline.execute() diff --git a/tests/test_registry.py b/tests/test_registry.py index f54b315..628636a 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from rq.compat import as_text -from rq.job import Job +from rq.job import Job, JobStatus from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker @@ -60,15 +60,21 @@ class TestRegistry(RQTestCase): """Moving expired jobs to FailedQueue.""" failed_queue = FailedQueue(connection=self.testconn) self.assertTrue(failed_queue.is_empty()) - self.testconn.zadd(self.registry.key, 2, 'foo') + + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + + self.testconn.zadd(self.registry.key, 2, job.id) self.registry.cleanup(1) - self.assertNotIn('foo', failed_queue.job_ids) - self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2) + self.assertNotIn(job.id, failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, job.id), 2) self.registry.cleanup() - self.assertIn('foo', failed_queue.job_ids) - self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None) + self.assertIn(job.id, failed_queue.job_ids) + self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None) + job.refresh() + self.assertEqual(job.status, JobStatus.FAILED) def test_job_execution(self): """Job is removed from StartedJobRegistry after execution."""