working_queue.remove call should be pipelined.

main
Selwin Ong 10 years ago
parent 893fc5a6ae
commit d667fb0713

@ -147,9 +147,10 @@ class Job(object):
) )
return self.get_status() return self.get_status()
def set_status(self, status): def set_status(self, status, pipeline=None):
self._status = status self._status = status
self.connection.hset(self.key, 'status', self._status) connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status)
def _set_status(self, status): def _set_status(self, status):
warnings.warn( warnings.warn(

@ -474,13 +474,15 @@ class Worker(object):
"""Performs misc bookkeeping like updating states prior to """Performs misc bookkeeping like updating states prior to
job execution. job execution.
""" """
with self.connection._pipeline() as pipeline:
timeout = (job.timeout or 180) + 60 timeout = (job.timeout or 180) + 60
with self.connection._pipeline() as pipeline:
self.set_state('busy', pipeline=pipeline) self.set_state('busy', pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(timeout, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline)
working_queue = WorkingQueue(job.origin, self.connection) working_queue = WorkingQueue(job.origin, self.connection)
working_queue.add(job, timeout, pipeline=pipeline) working_queue.add(job, timeout, pipeline=pipeline)
job.set_status(Status.STARTED, pipeline=pipeline)
pipeline.execute() pipeline.execute()
self.procline('Processing %s from %s since %s' % ( self.procline('Processing %s from %s since %s' % (
@ -497,7 +499,6 @@ class Worker(object):
working_queue = WorkingQueue(job.origin, self.connection) working_queue = WorkingQueue(job.origin, self.connection)
try: try:
job.set_status(Status.STARTED)
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform() rv = job.perform()
@ -518,9 +519,10 @@ class Worker(object):
pipeline.execute() pipeline.execute()
except Exception: except Exception:
# Use the public setter here, to immediately update Redis job.set_status(Status.FAILED, pipeline=pipeline)
job.set_status(Status.FAILED) working_queue.remove(job, pipeline=pipeline)
working_queue.remove(job) pipeline.execute()
self.handle_exception(job, *sys.exc_info()) self.handle_exception(job, *sys.exc_info())
return False return False

Loading…
Cancel
Save