diff --git a/rq/job.py b/rq/job.py index b45ed28..a9c5cc5 100644 --- a/rq/job.py +++ b/rq/job.py @@ -152,8 +152,7 @@ class Job(object): def set_status(self, status, pipeline=None): self._status = status - connection = pipeline if pipeline is not None else self.connection - connection.hset(self.key, 'status', self._status) + self.hset_value('status', self._status, pipeline) def _set_status(self, status): warnings.warn( @@ -164,6 +163,14 @@ class Job(object): status = property(_get_status, _set_status) + def set_started_at_now(self, pipeline=None): + now_fmt = utcformat(utcnow()) + self.hset_value('started_at', now_fmt, pipeline) + + def hset_value(self, key, value, pipeline=None): + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, key, value) + @property def is_finished(self): return self.get_status() == JobStatus.FINISHED @@ -306,6 +313,7 @@ class Job(object): self.description = None self.origin = None self.enqueued_at = None + self.started_at = None self.ended_at = None self._result = None self.exc_info = None @@ -410,6 +418,7 @@ class Job(object): self.origin = as_text(obj.get('origin')) self.description = as_text(obj.get('description')) self.enqueued_at = to_date(as_text(obj.get('enqueued_at'))) + self.started_at = to_date(as_text(obj.get('started_at'))) self.ended_at = to_date(as_text(obj.get('ended_at'))) self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') @@ -432,6 +441,8 @@ class Job(object): obj['description'] = self.description if self.enqueued_at is not None: obj['enqueued_at'] = utcformat(self.enqueued_at) + if self.started_at is not None: + obj['started_at'] = utcformat(self.started_at) if self.ended_at is not None: obj['ended_at'] = utcformat(self.ended_at) if self._result is not None: diff --git a/rq/worker.py b/rq/worker.py index d701109..a6e4a38 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -551,6 +551,7 @@ class Worker(object): registry = StartedJobRegistry(job.origin, self.connection) registry.add(job, timeout, pipeline=pipeline) job.set_status(JobStatus.STARTED, pipeline=pipeline) + job.set_started_at_now(pipeline=pipeline) pipeline.execute() msg = 'Processing {0} from {1} since {2}' diff --git a/tests/test_job.py b/tests/test_job.py index 387afeb..0845259 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -54,6 +54,7 @@ class TestJob(RQTestCase): # ...and nothing else self.assertIsNone(job.origin) self.assertIsNone(job.enqueued_at) + self.assertIsNone(job.started_at) self.assertIsNone(job.ended_at) self.assertIsNone(job.result) self.assertIsNone(job.exc_info) diff --git a/tests/test_worker.py b/tests/test_worker.py index 4688edc..48298ee 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -86,6 +86,25 @@ class TestWorker(RQTestCase): 'Expected at least some work done.') self.assertEqual(job.result, 'Hi there, Frank!') + def test_job_times(self): + """job times are set correctly.""" + q = Queue('foo') + w = Worker([q]) + before = utcnow() + before = before.replace(microsecond=0) + job = q.enqueue(say_hello) + self.assertIsNotNone(job.enqueued_at) + self.assertIsNone(job.started_at) + self.assertIsNone(job.ended_at) + self.assertEquals(w.work(burst=True), True, + 'Expected at least some work done.') + self.assertEquals(job.result, 'Hi there, Stranger!') + after = utcnow() + job.refresh() + self.assertTrue(before <= job.enqueued_at <= after, 'Not %s <= %s <= %s' % (before, job.enqueued_at, after)) + self.assertTrue(before <= job.started_at <= after, 'Not %s <= %s <= %s' % (before, job.started_at, after)) + self.assertTrue(before <= job.ended_at <= after, 'Not %s <= %s <= %s' % (before, job.ended_at, after)) + def test_work_is_unreadable(self): """Unreadable jobs are put on the failed queue.""" q = Queue()