Jobs from FinishedJobRegistry that are moved to FailedQueue should have "failed" as status.

main
Selwin Ong 10 years ago
parent 9895cb8dae
commit 70d5f971bd

@ -1,5 +1,7 @@
from .compat import as_text
from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .job import Job, JobStatus
from .queue import FailedQueue
from .utils import current_timestamp
@ -80,9 +82,17 @@ class StartedJobRegistry(BaseRegistry):
if job_ids:
failed_queue = FailedQueue(connection=self.connection)
with self.connection.pipeline() as pipeline:
for job_id in job_ids:
failed_queue.push_job_id(job_id, pipeline=pipeline)
try:
job = Job.fetch(job_id, connection=self.connection)
job.status = JobStatus.FAILED
job.save(pipeline=pipeline)
failed_queue.push_job_id(job_id, pipeline=pipeline)
except NoSuchJobError:
pass
pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute()

@ -2,7 +2,7 @@
from __future__ import absolute_import
from rq.compat import as_text
from rq.job import Job
from rq.job import Job, JobStatus
from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp
from rq.worker import Worker
@ -60,15 +60,21 @@ class TestRegistry(RQTestCase):
"""Moving expired jobs to FailedQueue."""
failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.registry.key, 2, 'foo')
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
self.testconn.zadd(self.registry.key, 2, job.id)
self.registry.cleanup(1)
self.assertNotIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), 2)
self.assertNotIn(job.id, failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), 2)
self.registry.cleanup()
self.assertIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None)
self.assertIn(job.id, failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None)
job.refresh()
self.assertEqual(job.status, JobStatus.FAILED)
def test_job_execution(self):
"""Job is removed from StartedJobRegistry after execution."""

Loading…
Cancel
Save