Leave setting the state to the worker, not the Job itself.

Should fix #389.
main
Vincent Driessen 11 years ago
parent 712e663ffc
commit 865efd6e8c

@ -470,13 +470,9 @@ class Job(object):
"""Invokes the job function with the job arguments.""" """Invokes the job function with the job arguments."""
_job_stack.push(self.id) _job_stack.push(self.id)
try: try:
self.set_status(Status.STARTED)
self._result = self.func(*self.args, **self.kwargs) self._result = self.func(*self.args, **self.kwargs)
self.set_status(Status.FINISHED)
self.ended_at = utcnow()
finally: finally:
assert self.id == _job_stack.pop() assert self.id == _job_stack.pop()
return self._result return self._result
def get_ttl(self, default_ttl=None): def get_ttl(self, default_ttl=None):

@ -483,6 +483,7 @@ class Worker(object):
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
try: try:
job.set_status(Status.STARTED)
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform() rv = job.perform()
@ -494,6 +495,8 @@ class Worker(object):
result_ttl = job.get_ttl(self.default_result_ttl) result_ttl = job.get_ttl(self.default_result_ttl)
if result_ttl != 0: if result_ttl != 0:
job.ended_at = utcnow()
job._status = Status.FINISHED
job.save(pipeline=pipeline) job.save(pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline)

Loading…
Cancel
Save