From 08de4190e7262956f3eb4cc61f3fc9723d806733 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Thu, 30 Jun 2016 13:17:28 -0400 Subject: [PATCH] Dry the code. Export handling failed current job to a method --- rq/worker.py | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 3a192ef..0a1d090 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -530,10 +530,10 @@ class Worker(object): break if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: with self.connection._pipeline() as pipeline: - job.set_status(JobStatus.FAILED, pipeline=pipeline) - started_job_registry = StartedJobRegistry(job.origin, self.connection) - started_job_registry.remove(job, pipeline=pipeline) - self.set_current_job_id(None, pipeline=pipeline) + self.handle_current_job_failure( + job=job, + pipeline=pipeline + ) try: pipeline.execute() except Exception: @@ -611,6 +611,27 @@ class Worker(object): msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) + def handle_current_job_failure( + self, + job, + started_job_registry=None, + pipeline=None + ): + """Handles the failure or an executing job by: + 1. Setting the job status to failed + 2. Removing the job from the started_job_registry + 3. Setting the workers current job to None + """ + + if started_job_registry is None: + started_job_registry = StartedJobRegistry( + job.origin, + self.connection + ) + job.set_status(JobStatus.FAILED, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) + self.set_current_job_id(None, pipeline=pipeline) + def perform_job(self, job, queue): """Performs the actual work of a job. Will/should only be called inside the work horse's process. @@ -651,9 +672,11 @@ class Worker(object): pipeline.execute() except Exception: - job.set_status(JobStatus.FAILED, pipeline=pipeline) - started_job_registry.remove(job, pipeline=pipeline) - self.set_current_job_id(None, pipeline=pipeline) + self.handle_current_job_failure( + job=job, + started_job_registry=started_job_registry, + pipeline=pipeline + ) try: pipeline.execute() except Exception: