Merge pull request #321 from selwin/cleanup-worker.work

Simplify worker.work() by moving some functionalities to relevant methods
main
Selwin Ong 11 years ago
commit 56053a33ec

@ -448,9 +448,13 @@ class Job(object):
"""Invokes the job function with the job arguments."""
_job_stack.push(self.id)
try:
self.set_status(Status.STARTED)
self._result = self.func(*self.args, **self.kwargs)
self.set_status(Status.FINISHED)
self.ended_at = utcnow()
finally:
assert self.id == _job_stack.pop()
return self._result
def get_ttl(self, default_ttl=None):

@ -332,12 +332,7 @@ class Worker(object):
if self.stopped:
self.log.info('Stopping on request.')
break
self.set_state('idle')
qnames = self.queue_names()
self.procline('Listening on %s' % ','.join(qnames))
self.log.info('')
self.log.info('*** Listening on %s...' %
green(', '.join(qnames)))
timeout = None if burst else max(1, self.default_worker_ttl - 60)
try:
result = self.dequeue_job_and_maintain_ttl(timeout)
@ -346,20 +341,9 @@ class Worker(object):
except StopRequested:
break
self.set_state('busy')
job, queue = result
self.set_current_job_id(job.id)
# Use the public setter here, to immediately update Redis
job.set_status(Status.STARTED)
self.log.info('%s: %s (%s)' % (green(queue.name),
blue(job.description), job.id))
self.heartbeat((job.timeout or 180) + 60)
self.execute_job(job)
self.heartbeat()
self.set_current_job_id(None)
if job.get_status() == Status.FINISHED:
queue.enqueue_dependents(job)
@ -372,11 +356,25 @@ class Worker(object):
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
qnames = self.queue_names()
self.set_state('idle')
self.procline('Listening on %s' % ','.join(qnames))
self.log.info('')
self.log.info('*** Listening on %s...' %
green(', '.join(qnames)))
while True:
self.heartbeat()
try:
result = Queue.dequeue_any(self.queues, timeout,
connection=self.connection)
if result is not None:
job, queue = result
self.log.info('%s: %s (%s)' % (green(queue.name),
blue(job.description), job.id))
break
except DequeueTimeout:
pass
@ -453,6 +451,11 @@ class Worker(object):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.set_state('busy')
self.set_current_job_id(job.id)
self.heartbeat((job.timeout or 180) + 60)
self.procline('Processing %s from %s since %s' % (
job.func_name,
job.origin, time.time()))
@ -465,13 +468,14 @@ class Worker(object):
# Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails
job._result = rv
job._status = Status.FINISHED
job.ended_at = utcnow()
self.set_current_job_id(None, pipeline=pipeline)
result_ttl = job.get_ttl(self.default_result_ttl)
if result_ttl != 0:
job.save(pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
pipeline.execute()
except Exception:

Loading…
Cancel
Save