FailedJobRegistry.requeue() resets job.started_at and job.ended_at (#1227)

main
Selwin Ong 5 years ago committed by GitHub
parent 636d6d2f54
commit cfe389bd65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -531,10 +531,9 @@ class Job(object):
obj['description'] = self.description obj['description'] = self.description
if self.enqueued_at is not None: if self.enqueued_at is not None:
obj['enqueued_at'] = utcformat(self.enqueued_at) obj['enqueued_at'] = utcformat(self.enqueued_at)
if self.started_at is not None:
obj['started_at'] = utcformat(self.started_at) obj['started_at'] = utcformat(self.started_at) if self.started_at else ''
if self.ended_at is not None: obj['ended_at'] = utcformat(self.ended_at) if self.ended_at else ''
obj['ended_at'] = utcformat(self.ended_at)
if self._result is not None: if self._result is not None:
try: try:
obj['result'] = dumps(self._result) obj['result'] = dumps(self._result)

@ -214,11 +214,16 @@ class FailedJobRegistry(BaseRegistry):
result = self.connection.zrem(self.key, job.id) result = self.connection.zrem(self.key, job.id)
if not result: if not result:
raise InvalidJobOperation raise InvalidJobOperation
queue = Queue(job.origin, connection=self.connection, with self.connection.pipeline() as pipeline:
job_class=self.job_class) queue = Queue(job.origin, connection=self.connection,
job_class=self.job_class)
return queue.enqueue_job(job) job.started_at = None
job.ended_at = None
job.save()
job = queue.enqueue_job(job, pipeline=pipeline)
pipeline.execute()
return job
class DeferredJobRegistry(BaseRegistry): class DeferredJobRegistry(BaseRegistry):

@ -226,7 +226,7 @@ def backend_class(holder, default_name, override=None):
def str_to_date(date_str): def str_to_date(date_str):
if date_str is None: if not date_str:
return return
else: else:
return utcparse(as_text(date_str)) return utcparse(as_text(date_str))

@ -219,7 +219,7 @@ class TestJob(RQTestCase):
# ... and no other keys are stored # ... and no other keys are stored
self.assertEqual( self.assertEqual(
sorted(self.testconn.hkeys(job.key)), sorted(self.testconn.hkeys(job.key)),
[b'created_at', b'data', b'description']) [b'created_at', b'data', b'description', b'ended_at', b'started_at'])
def test_persistence_of_parent_job(self): def test_persistence_of_parent_job(self):
"""Storing jobs with parent job, either instance or key.""" """Storing jobs with parent job, either instance or key."""

@ -356,6 +356,8 @@ class TestFailedJobRegistry(RQTestCase):
job.refresh() job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED) self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(job.started_at, None)
self.assertEqual(job.ended_at, None)
worker.work(burst=True) worker.work(burst=True)
self.assertTrue(job in registry) self.assertTrue(job in registry)

Loading…
Cancel
Save