From 606d4fa10f9f20480b4d5dca6c11c54431d63758 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Wed, 25 Sep 2013 10:32:21 +0200 Subject: [PATCH 1/4] Add optional pipeline parameter to set_state. --- rq/worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 09af23c..25195a0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -215,9 +215,11 @@ class Worker(object): p.expire(self.key, 60) p.execute() - def set_state(self, new_state): + def set_state(self, new_state, pipeline=None): self._state = new_state - self.connection.hset(self.key, 'state', new_state) + + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'state', new_state) def get_state(self): return self._state From c745f6811ab1f40209ef9c94bf8ef6c7800e684b Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Wed, 25 Sep 2013 10:52:56 +0200 Subject: [PATCH 2/4] Added job_id property, storing the id of the currently executing job. --- rq/worker.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 25195a0..03610aa 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -93,6 +93,7 @@ class Worker(object): worker = cls([], name, connection=connection) queues = as_text(connection.hget(worker.key, 'queues')) worker._state = connection.hget(worker.key, 'state') or '?' + worker._job_id = connection.hget(worker.key, 'current_job') or None if queues: worker.queues = [Queue(queue, connection=connection) for queue in queues.split(',')] @@ -226,6 +227,21 @@ class Worker(object): state = property(get_state, set_state) + def set_job_id(self, new_job_id, pipeline=None): + self._job_id = new_job_id + + connection = pipeline if pipeline is not None else self.connection + + if new_job_id is None: + connection.hdel(self.key, 'current_job') + else: + connection.hset(self.key, 'current_job', new_job_id) + + def get_job_id(self): + return self._job_id + + job_id = property(get_job_id, set_job_id) + @property def stopped(self): return self._stopped @@ -320,6 +336,8 @@ class Worker(object): self.state = 'busy' job, queue = result + self.job_id = job.id + # Use the public setter here, to immediately update Redis job.status = Status.STARTED self.log.info('%s: %s (%s)' % (green(queue.name), @@ -328,6 +346,8 @@ class Worker(object): self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60) self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) + self.job_id = None + if job.status == 'finished': queue.enqueue_dependents(job) From 3d0138d31961035d9411efce38fb18420551ad45 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Wed, 25 Sep 2013 11:14:28 +0200 Subject: [PATCH 3/4] Added Worker.get_current_job() convenience method. --- rq/worker.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 03610aa..37ae655 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -242,6 +242,15 @@ class Worker(object): job_id = property(get_job_id, set_job_id) + # most client will want to use the method below to query the current job + def get_current_job(self): + job_id = self.get_job_id() + + if job_id is None: + return None + + return Job.safe_fetch(job_id) + @property def stopped(self): return self._stopped From e81ee5b672ad842c13927636a406f360498ea12e Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Mon, 3 Feb 2014 08:58:16 +0100 Subject: [PATCH 4/4] Update worker.py Use docstring instead of comment. Sorry. --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 37ae655..d3a5489 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -242,8 +242,8 @@ class Worker(object): job_id = property(get_job_id, set_job_id) - # most client will want to use the method below to query the current job def get_current_job(self): + """Returns the job id of the currently executing job.""" job_id = self.get_job_id() if job_id is None: