diff --git a/rq/worker.py b/rq/worker.py index df4812c..92d55bb 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -332,12 +332,7 @@ class Worker(object): if self.stopped: self.log.info('Stopping on request.') break - self.set_state('idle') - qnames = self.queue_names() - self.procline('Listening on %s' % ','.join(qnames)) - self.log.info('') - self.log.info('*** Listening on %s...' % - green(', '.join(qnames))) + timeout = None if burst else max(1, self.default_worker_ttl - 60) try: result = self.dequeue_job_and_maintain_ttl(timeout) @@ -346,21 +341,10 @@ class Worker(object): except StopRequested: break - self.set_state('busy') - job, queue = result - self.set_current_job_id(job.id) - - # Use the public setter here, to immediately update Redis - job.set_status(Status.STARTED) - self.log.info('%s: %s (%s)' % (green(queue.name), - blue(job.description), job.id)) - - self.heartbeat((job.timeout or 180) + 60) self.execute_job(job) self.heartbeat() - self.set_current_job_id(None) - + if job.get_status() == Status.FINISHED: queue.enqueue_dependents(job) @@ -372,11 +356,25 @@ class Worker(object): def dequeue_job_and_maintain_ttl(self, timeout): result = None + qnames = self.queue_names() + + self.set_state('idle') + self.procline('Listening on %s' % ','.join(qnames)) + self.log.info('') + self.log.info('*** Listening on %s...' % + green(', '.join(qnames))) + while True: self.heartbeat() + try: result = Queue.dequeue_any(self.queues, timeout, connection=self.connection) + if result is not None: + job, queue = result + self.log.info('%s: %s (%s)' % (green(queue.name), + blue(job.description), job.id)) + break except DequeueTimeout: pass @@ -453,6 +451,12 @@ class Worker(object): """Performs the actual work of a job. Will/should only be called inside the work horse's process. """ + + self.set_state('busy') + self.set_current_job_id(job.id) + job.set_status(Status.STARTED) + self.heartbeat((job.timeout or 180) + 60) + self.procline('Processing %s from %s since %s' % ( job.func_name, job.origin, time.time())) @@ -467,11 +471,13 @@ class Worker(object): job._result = rv job._status = Status.FINISHED job.ended_at = utcnow() + self.set_current_job_id(None, pipeline=pipeline) result_ttl = job.get_ttl(self.default_result_ttl) if result_ttl != 0: job.save(pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline) + pipeline.execute() except Exception: