diff --git a/rq/worker.py b/rq/worker.py index 9048e68..ac4d7e6 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -14,7 +14,7 @@ import traceback import logging from .queue import Queue, get_failed_queue from .connections import get_current_connection -from .job import Status +from .job import Job, Status from .utils import make_colorizer, utcnow, utcformat from .logutils import setup_loghandlers from .exceptions import NoQueueError, DequeueTimeout @@ -91,6 +91,7 @@ class Worker(object): worker = cls([], name, connection=connection) queues = as_text(connection.hget(worker.key, 'queues')) worker._state = connection.hget(worker.key, 'state') or '?' + worker._job_id = connection.hget(worker.key, 'current_job') or None if queues: worker.queues = [Queue(queue, connection=connection) for queue in queues.split(',')] @@ -213,9 +214,10 @@ class Worker(object): p.expire(self.key, 60) p.execute() - def set_state(self, state): + def set_state(self, state, pipeline=None): self._state = state - self.connection.hset(self.key, 'state', state) + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'state', state) def _set_state(self, state): self.set_state(state) @@ -232,6 +234,28 @@ class Worker(object): state = property(_get_state, _set_state) + def set_job_id(self, new_job_id, pipeline=None): + self._job_id = new_job_id + + connection = pipeline if pipeline is not None else self.connection + + if new_job_id is None: + connection.hdel(self.key, 'current_job') + else: + connection.hset(self.key, 'current_job', new_job_id) + + def get_job_id(self): + return self._job_id + + def get_current_job(self): + """Returns the job id of the currently executing job.""" + job_id = self.get_job_id() + + if job_id is None: + return None + + return Job.safe_fetch(job_id) + @property def stopped(self): return self._stopped @@ -322,6 +346,8 @@ class Worker(object): self.set_state('busy') job, queue = result + self.set_job_id(job.id) + # Use the public setter here, to immediately update Redis job.status = Status.STARTED self.log.info('%s: %s (%s)' % (green(queue.name), @@ -330,6 +356,8 @@ class Worker(object): self.heartbeat((job.timeout or 180) + 60) self.execute_job(job) self.heartbeat() + self.set_job_id(None) + if job.status == Status.FINISHED: queue.enqueue_dependents(job)