From 606d4fa10f9f20480b4d5dca6c11c54431d63758 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Wed, 25 Sep 2013 10:32:21 +0200 Subject: [PATCH 1/9] Add optional pipeline parameter to set_state. --- rq/worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 09af23c..25195a0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -215,9 +215,11 @@ class Worker(object): p.expire(self.key, 60) p.execute() - def set_state(self, new_state): + def set_state(self, new_state, pipeline=None): self._state = new_state - self.connection.hset(self.key, 'state', new_state) + + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'state', new_state) def get_state(self): return self._state From c745f6811ab1f40209ef9c94bf8ef6c7800e684b Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Wed, 25 Sep 2013 10:52:56 +0200 Subject: [PATCH 2/9] Added job_id property, storing the id of the currently executing job. --- rq/worker.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 25195a0..03610aa 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -93,6 +93,7 @@ class Worker(object): worker = cls([], name, connection=connection) queues = as_text(connection.hget(worker.key, 'queues')) worker._state = connection.hget(worker.key, 'state') or '?' + worker._job_id = connection.hget(worker.key, 'current_job') or None if queues: worker.queues = [Queue(queue, connection=connection) for queue in queues.split(',')] @@ -226,6 +227,21 @@ class Worker(object): state = property(get_state, set_state) + def set_job_id(self, new_job_id, pipeline=None): + self._job_id = new_job_id + + connection = pipeline if pipeline is not None else self.connection + + if new_job_id is None: + connection.hdel(self.key, 'current_job') + else: + connection.hset(self.key, 'current_job', new_job_id) + + def get_job_id(self): + return self._job_id + + job_id = property(get_job_id, set_job_id) + @property def stopped(self): return self._stopped @@ -320,6 +336,8 @@ class Worker(object): self.state = 'busy' job, queue = result + self.job_id = job.id + # Use the public setter here, to immediately update Redis job.status = Status.STARTED self.log.info('%s: %s (%s)' % (green(queue.name), @@ -328,6 +346,8 @@ class Worker(object): self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60) self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) + self.job_id = None + if job.status == 'finished': queue.enqueue_dependents(job) From 3d0138d31961035d9411efce38fb18420551ad45 Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Wed, 25 Sep 2013 11:14:28 +0200 Subject: [PATCH 3/9] Added Worker.get_current_job() convenience method. --- rq/worker.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 03610aa..37ae655 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -242,6 +242,15 @@ class Worker(object): job_id = property(get_job_id, set_job_id) + # most client will want to use the method below to query the current job + def get_current_job(self): + job_id = self.get_job_id() + + if job_id is None: + return None + + return Job.safe_fetch(job_id) + @property def stopped(self): return self._stopped From e81ee5b672ad842c13927636a406f360498ea12e Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Mon, 3 Feb 2014 08:58:16 +0100 Subject: [PATCH 4/9] Update worker.py Use docstring instead of comment. Sorry. --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 37ae655..d3a5489 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -242,8 +242,8 @@ class Worker(object): job_id = property(get_job_id, set_job_id) - # most client will want to use the method below to query the current job def get_current_job(self): + """Returns the job id of the currently executing job.""" job_id = self.get_job_id() if job_id is None: From 37376cfe42be63e48bb36eb12590cadf46071ebf Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 5 Mar 2014 16:51:32 +0700 Subject: [PATCH 5/9] Deprecate worker.state. --- rq/worker.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index c9faa7e..9048e68 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -14,10 +14,10 @@ import traceback import logging from .queue import Queue, get_failed_queue from .connections import get_current_connection -from .job import Job, Status +from .job import Status from .utils import make_colorizer, utcnow, utcformat from .logutils import setup_loghandlers -from .exceptions import NoQueueError, UnpickleError, DequeueTimeout +from .exceptions import NoQueueError, DequeueTimeout from .timeouts import death_penalty_after from .version import VERSION from rq.compat import text_type, as_text @@ -213,14 +213,24 @@ class Worker(object): p.expire(self.key, 60) p.execute() - def set_state(self, new_state): - self._state = new_state - self.connection.hset(self.key, 'state', new_state) + def set_state(self, state): + self._state = state + self.connection.hset(self.key, 'state', state) + + def _set_state(self, state): + self.set_state(state) def get_state(self): return self._state - state = property(get_state, set_state) + def _get_state(self): + """Raise a DeprecationWarning if ``worker.state == X`` is used""" + raise DeprecationWarning( + "worker.state is deprecated, use worker.get_state() instead." + ) + return self.get_state() + + state = property(_get_state, _set_state) @property def stopped(self): @@ -263,7 +273,7 @@ class Worker(object): # If shutdown is requested in the middle of a job, wait until # finish before shutting down - if self.state == 'busy': + if self.get_state() == 'busy': self._stopped = True self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.') @@ -289,13 +299,13 @@ class Worker(object): did_perform_work = False self.register_birth() self.log.info('RQ worker started, version %s' % VERSION) - self.state = 'starting' + self.set_state('starting') try: while True: if self.stopped: self.log.info('Stopping on request.') break - self.state = 'idle' + self.set_state('idle') qnames = self.queue_names() self.procline('Listening on %s' % ','.join(qnames)) self.log.info('') @@ -309,7 +319,7 @@ class Worker(object): except StopRequested: break - self.state = 'busy' + self.set_state('busy') job, queue = result # Use the public setter here, to immediately update Redis From 93bb9ec5f4044235dead40876ffa4b940618f5af Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 6 Mar 2014 08:57:50 +0700 Subject: [PATCH 6/9] Added tests for worker.get_current_job(). --- rq/worker.py | 2 +- tests/test_worker.py | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index ac4d7e6..a288d21 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -254,7 +254,7 @@ class Worker(object): if job_id is None: return None - return Job.safe_fetch(job_id) + return Job.fetch(job_id, self.connection) @property def stopped(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index dca6074..b66ab59 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,7 +1,6 @@ import os -from time import sleep from tests import RQTestCase, slow -from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \ +from tests.fixtures import say_hello, div_by_zero, create_file, \ create_file_after_timeout from tests.helpers import strip_microseconds from rq import Queue, Worker, get_failed_queue @@ -250,3 +249,17 @@ class TestWorker(RQTestCase): w.work(burst=True) job = Job.fetch(job.id) self.assertNotEqual(job.status, Status.FINISHED) + + def test_get_current_job(self): + """Ensure worker.get_current_job() works properly""" + q = Queue() + worker = Worker([q]) + job = q.enqueue_call(say_hello) + + self.assertEqual(self.testconn.hget(worker.key, 'current_job'), None) + worker.set_job_id(job.id) + self.assertEqual( + worker.get_job_id(), + self.testconn.hget(worker.key, 'current_job') + ) + self.assertEqual(worker.get_current_job(), job) From 802ecb5ccb73edf497e507bb264a86a35dbf8bf3 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 6 Mar 2014 09:02:59 +0700 Subject: [PATCH 7/9] Renamed worker.set_job_id() and worker.get_job_id() for consistency. --- rq/worker.py | 21 ++++++++++----------- tests/test_worker.py | 4 ++-- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index a288d21..7de07c3 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -231,25 +231,24 @@ class Worker(object): "worker.state is deprecated, use worker.get_state() instead." ) return self.get_state() - - state = property(_get_state, _set_state) - def set_job_id(self, new_job_id, pipeline=None): - self._job_id = new_job_id + state = property(_get_state, _set_state) + def set_current_job_id(self, job_id, pipeline=None): connection = pipeline if pipeline is not None else self.connection - if new_job_id is None: + if job_id is None: connection.hdel(self.key, 'current_job') else: - connection.hset(self.key, 'current_job', new_job_id) + connection.hset(self.key, 'current_job', job_id) - def get_job_id(self): - return self._job_id + def get_current_job_id(self, pipeline=None): + connection = pipeline if pipeline is not None else self.connection + return connection.hget(self.key, 'current_job') def get_current_job(self): """Returns the job id of the currently executing job.""" - job_id = self.get_job_id() + job_id = self.get_current_job_id() if job_id is None: return None @@ -346,7 +345,7 @@ class Worker(object): self.set_state('busy') job, queue = result - self.set_job_id(job.id) + self.set_current_job_id(job.id) # Use the public setter here, to immediately update Redis job.status = Status.STARTED @@ -356,7 +355,7 @@ class Worker(object): self.heartbeat((job.timeout or 180) + 60) self.execute_job(job) self.heartbeat() - self.set_job_id(None) + self.set_current_job_id(None) if job.status == Status.FINISHED: queue.enqueue_dependents(job) diff --git a/tests/test_worker.py b/tests/test_worker.py index b66ab59..7319bb6 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -257,9 +257,9 @@ class TestWorker(RQTestCase): job = q.enqueue_call(say_hello) self.assertEqual(self.testconn.hget(worker.key, 'current_job'), None) - worker.set_job_id(job.id) + worker.set_current_job_id(job.id) self.assertEqual( - worker.get_job_id(), + worker.get_current_job_id(), self.testconn.hget(worker.key, 'current_job') ) self.assertEqual(worker.get_current_job(), job) From 2fe5d9e25e923f4dd47c62788a7c1db9c7526f10 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 6 Mar 2014 09:41:32 +0700 Subject: [PATCH 8/9] Python 3 compatibility with worker.get_current_job(). --- rq/worker.py | 2 +- tests/test_worker.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 7de07c3..5afe7fa 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -244,7 +244,7 @@ class Worker(object): def get_current_job_id(self, pipeline=None): connection = pipeline if pipeline is not None else self.connection - return connection.hget(self.key, 'current_job') + return as_text(connection.hget(self.key, 'current_job')) def get_current_job(self): """Returns the job id of the currently executing job.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 7319bb6..a7fa320 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,6 +4,7 @@ from tests.fixtures import say_hello, div_by_zero, create_file, \ create_file_after_timeout from tests.helpers import strip_microseconds from rq import Queue, Worker, get_failed_queue +from rq.compat import as_text from rq.job import Job, Status @@ -185,7 +186,7 @@ class TestWorker(RQTestCase): # TODO: Having to do the manual refresh() here is really ugly! res.refresh() - self.assertIn('JobTimeoutException', res.exc_info) + self.assertIn('JobTimeoutException', as_text(res.exc_info)) def test_worker_sets_result_ttl(self): """Ensure that Worker properly sets result_ttl for individual jobs.""" @@ -260,6 +261,6 @@ class TestWorker(RQTestCase): worker.set_current_job_id(job.id) self.assertEqual( worker.get_current_job_id(), - self.testconn.hget(worker.key, 'current_job') + as_text(self.testconn.hget(worker.key, 'current_job')) ) self.assertEqual(worker.get_current_job(), job) From 72457d2286c57427204404091f7bf63dcf24e489 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 6 Mar 2014 09:44:44 +0700 Subject: [PATCH 9/9] Added DeprecationWarning to worker.state. --- rq/worker.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 5afe7fa..c015dfa 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -220,6 +220,10 @@ class Worker(object): connection.hset(self.key, 'state', state) def _set_state(self, state): + """Raise a DeprecationWarning if ``worker.state = X`` is used""" + raise DeprecationWarning( + "worker.state is deprecated, use worker.set_state() instead." + ) self.set_state(state) def get_state(self):