From b0c0a84ab026e68e587e0ddc7736545daff42acf Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 30 Jul 2014 13:36:35 +0800 Subject: [PATCH] Moved some logic into worker.prepare_job_execution to make things testable. --- rq/worker.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index be4a955..4348217 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,6 +23,7 @@ from .queue import get_failed_queue, Queue from .timeouts import UnixSignalDeathPenalty from .utils import import_attribute, make_colorizer, utcformat, utcnow from .version import VERSION +from .working_queue import WorkingQueue try: from procname import setprocname @@ -403,7 +404,7 @@ class Worker(object): self.heartbeat() return result - def heartbeat(self, timeout=0): + def heartbeat(self, timeout=0, pipeline=None): """Specifies a new worker timeout, typically by extending the expiration time of the worker, effectively making this a "heartbeat" to not expire the worker until the timeout passes. @@ -415,7 +416,8 @@ class Worker(object): only larger. """ timeout = max(timeout, self.default_worker_ttl) - self.connection.expire(self.key, timeout) + connection = pipeline if pipeline is not None else self.connection + connection.expire(self.key, timeout) self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within {0} seconds.'.format(timeout)) @@ -468,19 +470,26 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) - def perform_job(self, job): - """Performs the actual work of a job. Will/should only be called - inside the work horse's process. + def prepare_job_execution(self, job): + """Performs misc bookkeeping like updating states prior to + job execution. """ - - self.set_state('busy') - self.set_current_job_id(job.id) - self.heartbeat((job.timeout or 180) + 60) + with self.connection._pipeline() as pipeline: + self.set_state('busy', pipeline=pipeline) + self.set_current_job_id(job.id, pipeline=pipeline) + self.heartbeat((job.timeout or 180) + 60, pipeline=pipeline) + pipeline.execute() self.procline('Processing %s from %s since %s' % ( job.func_name, job.origin, time.time())) + def perform_job(self, job): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ + self.prepare_job_execution(job) + with self.connection._pipeline() as pipeline: try: job.set_status(Status.STARTED)