From 60c7a3cc6e90bf167f8c98e3232793718846918f Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 7 Sep 2014 17:03:17 +0700 Subject: [PATCH] working_queue.remove call should be pipelined. --- rq/job.py | 5 +++-- rq/worker.py | 12 +++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/rq/job.py b/rq/job.py index ef7a266..bd91f13 100644 --- a/rq/job.py +++ b/rq/job.py @@ -147,9 +147,10 @@ class Job(object): ) return self.get_status() - def set_status(self, status): + def set_status(self, status, pipeline=None): self._status = status - self.connection.hset(self.key, 'status', self._status) + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'status', self._status) def _set_status(self, status): warnings.warn( diff --git a/rq/worker.py b/rq/worker.py index 3fa0611..011731e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -474,13 +474,15 @@ class Worker(object): """Performs misc bookkeeping like updating states prior to job execution. """ + timeout = (job.timeout or 180) + 60 + with self.connection._pipeline() as pipeline: - timeout = (job.timeout or 180) + 60 self.set_state('busy', pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(timeout, pipeline=pipeline) working_queue = WorkingQueue(job.origin, self.connection) working_queue.add(job, timeout, pipeline=pipeline) + job.set_status(Status.STARTED, pipeline=pipeline) pipeline.execute() self.procline('Processing %s from %s since %s' % ( @@ -497,7 +499,6 @@ class Worker(object): working_queue = WorkingQueue(job.origin, self.connection) try: - job.set_status(Status.STARTED) with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() @@ -518,9 +519,10 @@ class Worker(object): pipeline.execute() except Exception: - # Use the public setter here, to immediately update Redis - job.set_status(Status.FAILED) - working_queue.remove(job) + job.set_status(Status.FAILED, pipeline=pipeline) + working_queue.remove(job, pipeline=pipeline) + pipeline.execute() + self.handle_exception(job, *sys.exc_info()) return False