diff --git a/rq/worker.py b/rq/worker.py index 25195a0..03610aa 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -93,6 +93,7 @@ class Worker(object): worker = cls([], name, connection=connection) queues = as_text(connection.hget(worker.key, 'queues')) worker._state = connection.hget(worker.key, 'state') or '?' + worker._job_id = connection.hget(worker.key, 'current_job') or None if queues: worker.queues = [Queue(queue, connection=connection) for queue in queues.split(',')] @@ -226,6 +227,21 @@ class Worker(object): state = property(get_state, set_state) + def set_job_id(self, new_job_id, pipeline=None): + self._job_id = new_job_id + + connection = pipeline if pipeline is not None else self.connection + + if new_job_id is None: + connection.hdel(self.key, 'current_job') + else: + connection.hset(self.key, 'current_job', new_job_id) + + def get_job_id(self): + return self._job_id + + job_id = property(get_job_id, set_job_id) + @property def stopped(self): return self._stopped @@ -320,6 +336,8 @@ class Worker(object): self.state = 'busy' job, queue = result + self.job_id = job.id + # Use the public setter here, to immediately update Redis job.status = Status.STARTED self.log.info('%s: %s (%s)' % (green(queue.name), @@ -328,6 +346,8 @@ class Worker(object): self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60) self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) + self.job_id = None + if job.status == 'finished': queue.enqueue_dependents(job)