Merge pull request #589 from samuelcolvin/job-started_at

add job.started_at
main
Selwin Ong 9 years ago
commit 5afd1a90e5

@ -152,8 +152,7 @@ class Job(object):
def set_status(self, status, pipeline=None):
self._status = status
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status)
self.hset_value('status', self._status, pipeline)
def _set_status(self, status):
warnings.warn(
@ -164,6 +163,14 @@ class Job(object):
status = property(_get_status, _set_status)
def set_started_at_now(self, pipeline=None):
now_fmt = utcformat(utcnow())
self.hset_value('started_at', now_fmt, pipeline)
def hset_value(self, key, value, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, key, value)
@property
def is_finished(self):
return self.get_status() == JobStatus.FINISHED
@ -306,6 +313,7 @@ class Job(object):
self.description = None
self.origin = None
self.enqueued_at = None
self.started_at = None
self.ended_at = None
self._result = None
self.exc_info = None
@ -410,6 +418,7 @@ class Job(object):
self.origin = as_text(obj.get('origin'))
self.description = as_text(obj.get('description'))
self.enqueued_at = to_date(as_text(obj.get('enqueued_at')))
self.started_at = to_date(as_text(obj.get('started_at')))
self.ended_at = to_date(as_text(obj.get('ended_at')))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.exc_info = obj.get('exc_info')
@ -432,6 +441,8 @@ class Job(object):
obj['description'] = self.description
if self.enqueued_at is not None:
obj['enqueued_at'] = utcformat(self.enqueued_at)
if self.started_at is not None:
obj['started_at'] = utcformat(self.started_at)
if self.ended_at is not None:
obj['ended_at'] = utcformat(self.ended_at)
if self._result is not None:

@ -551,6 +551,7 @@ class Worker(object):
registry = StartedJobRegistry(job.origin, self.connection)
registry.add(job, timeout, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline)
job.set_started_at_now(pipeline=pipeline)
pipeline.execute()
msg = 'Processing {0} from {1} since {2}'

@ -54,6 +54,7 @@ class TestJob(RQTestCase):
# ...and nothing else
self.assertIsNone(job.origin)
self.assertIsNone(job.enqueued_at)
self.assertIsNone(job.started_at)
self.assertIsNone(job.ended_at)
self.assertIsNone(job.result)
self.assertIsNone(job.exc_info)

@ -86,6 +86,25 @@ class TestWorker(RQTestCase):
'Expected at least some work done.')
self.assertEqual(job.result, 'Hi there, Frank!')
def test_job_times(self):
"""job times are set correctly."""
q = Queue('foo')
w = Worker([q])
before = utcnow()
before = before.replace(microsecond=0)
job = q.enqueue(say_hello)
self.assertIsNotNone(job.enqueued_at)
self.assertIsNone(job.started_at)
self.assertIsNone(job.ended_at)
self.assertEquals(w.work(burst=True), True,
'Expected at least some work done.')
self.assertEquals(job.result, 'Hi there, Stranger!')
after = utcnow()
job.refresh()
self.assertTrue(before <= job.enqueued_at <= after, 'Not %s <= %s <= %s' % (before, job.enqueued_at, after))
self.assertTrue(before <= job.started_at <= after, 'Not %s <= %s <= %s' % (before, job.started_at, after))
self.assertTrue(before <= job.ended_at <= after, 'Not %s <= %s <= %s' % (before, job.ended_at, after))
def test_work_is_unreadable(self):
"""Unreadable jobs are put on the failed queue."""
q = Queue()

Loading…
Cancel
Save