diff --git a/rq/worker.py b/rq/worker.py index 0321ca0..69aac70 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -416,28 +416,28 @@ class Worker(object): job.func_name, job.origin, time.time())) - try: - with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT): - rv = job.perform() - - # Pickle the result in the same try-except block since we need to - # use the same exc handling when pickling fails - job._result = rv - job._status = Status.FINISHED - job.ended_at = utcnow() - - result_ttl = job.get_ttl(self.default_result_ttl) - pipeline = self.connection._pipeline() - if result_ttl != 0: - job.save(pipeline=pipeline) - job.cleanup(result_ttl, pipeline=pipeline) - pipeline.execute() - - except: - # Use the public setter here, to immediately update Redis - job.status = Status.FAILED - self.handle_exception(job, *sys.exc_info()) - return False + with self.connection.pipeline() as pipeline: + try: + with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT): + rv = job.perform() + + # Pickle the result in the same try-except block since we need to + # use the same exc handling when pickling fails + job._result = rv + job._status = Status.FINISHED + job.ended_at = utcnow() + + result_ttl = job.get_ttl(self.default_result_ttl) + if result_ttl != 0: + job.save(pipeline=pipeline) + job.cleanup(result_ttl, pipeline=pipeline) + pipeline.execute() + + except Exception: + # Use the public setter here, to immediately update Redis + job.status = Status.FAILED + self.handle_exception(job, *sys.exc_info()) + return False if rv is None: self.log.info('Job OK')