From 3596449cc0bf2e51809cfe09d0ab9fca522775db Mon Sep 17 00:00:00 2001 From: Alexey Katichev Date: Sun, 7 May 2017 16:57:12 +0300 Subject: [PATCH 1/3] remove implicit cleanup call from job.save --- rq/job.py | 15 ++++++++------- rq/queue.py | 3 +++ rq/registry.py | 1 + tests/test_job.py | 2 +- tests/test_queue.py | 12 ++++++++++++ 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/rq/job.py b/rq/job.py index 0ccf736..cabd3f9 100644 --- a/rq/job.py +++ b/rq/job.py @@ -480,18 +480,18 @@ class Job(object): def save(self, pipeline=None, include_meta=True): """ - Persists the current job instance to its corresponding Redis key. + Dumps the current job instance to its corresponding Redis key. - Exclude persisting the `meta` dictionary by setting + Exclude saving the `meta` dictionary by setting `include_meta=False`. This is useful to prevent clobbering user metadata without an expensive `refresh()` call first. + Redis key persistence may be altered by `cleanup()` method. """ key = self.key connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict(include_meta=include_meta)) - self.cleanup(self.ttl, pipeline=connection) def save_meta(self): """Stores job meta from the job instance to the corresponding Redis key.""" @@ -549,12 +549,10 @@ class Job(object): connection.delete(self.key) connection.delete(self.dependents_key) - # Job execution def perform(self): # noqa """Invokes the job function with the job arguments.""" self.connection.persist(self.key) - self.ttl = -1 _job_stack.push(self.id) try: self._result = self._execute() @@ -609,9 +607,12 @@ class Job(object): self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) elif not ttl: return - elif ttl > 0: + else: connection = pipeline if pipeline is not None else self.connection - connection.expire(self.key, ttl) + if ttl > 0: + connection.expire(self.key, ttl) + else: + connection.persist(self.key) def register_dependency(self, pipeline=None): """Jobs may have dependencies. Jobs are enqueued only if the job they diff --git a/rq/queue.py b/rq/queue.py index 090997e..bd3a6ec 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -223,6 +223,7 @@ class Queue(object): job.set_status(JobStatus.DEFERRED) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) + job.cleanup(ttl=job.ttl, pipeline=pipe) pipe.execute() return job break @@ -296,6 +297,7 @@ class Queue(object): if job.timeout is None: job.timeout = self.DEFAULT_TIMEOUT job.save(pipeline=pipe) + job.cleanup(ttl=job.ttl, pipeline=pipe) if self._async: self.push_job_id(job.id, pipeline=pipe, at_front=at_front) @@ -492,6 +494,7 @@ class FailedQueue(Queue): job.ended_at = utcnow() job.exc_info = exc_info job.save(pipeline=pipeline, include_meta=False) + job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire self.push_job_id(job.id, pipeline=pipeline) pipeline.execute() diff --git a/rq/registry.py b/rq/registry.py index 168dd8e..51e6dd0 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -93,6 +93,7 @@ class StartedJobRegistry(BaseRegistry): connection=self.connection) job.set_status(JobStatus.FAILED) job.save(pipeline=pipeline, include_meta=False) + job.cleanup(ttl=-1, pipeline=pipeline) failed_queue.push_job_id(job_id, pipeline=pipeline) except NoSuchJobError: pass diff --git a/tests/test_job.py b/tests/test_job.py index 1491f7f..46d7d39 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -386,7 +386,7 @@ class TestJob(RQTestCase): self.assertEqual(job.get_ttl(), ttl) job.save() job.perform() - self.assertEqual(job.get_ttl(), -1) + self.assertEqual(job.get_ttl(), ttl) self.assertTrue(job.exists(job.id)) self.assertEqual(job.result, 'Done sleeping...') diff --git a/tests/test_queue.py b/tests/test_queue.py index 294fb62..3eea3fb 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -650,3 +650,15 @@ class TestFailedQueue(RQTestCase): job.delete() self.assertFalse(job.id in failed_queue.get_job_ids()) + + def test_job_in_failed_queue_persists(self): + """Make sure failed job key does not expire""" + q = Queue('foo') + job = q.enqueue(div_by_zero, args=(1, 2, 3), ttl=5) + self.assertEqual(self.testconn.ttl(job.key), 5) + + job.set_status(JobStatus.FAILED) + failed_queue = get_failed_queue() + failed_queue.quarantine(job, Exception('Some fake error')) + + self.assertEqual(self.testconn.ttl(job.key), -1) From b7d50cedc4a83c81825bc51aa0518cd17c4e3e63 Mon Sep 17 00:00:00 2001 From: Alexey Katichev Date: Sun, 7 May 2017 17:11:44 +0300 Subject: [PATCH 2/3] replace job.id with job instance in local _job_stack --- rq/job.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/rq/job.py b/rq/job.py index cabd3f9..ea25269 100644 --- a/rq/job.py +++ b/rq/job.py @@ -75,11 +75,10 @@ def get_current_job(connection=None, job_class=None): """Returns the Job instance that is currently being executed. If this function is invoked from outside a job context, None is returned. """ - job_class = job_class or Job - job_id = _job_stack.top - if job_id is None: - return None - return job_class.fetch(job_id, connection=connection) + if job_class: + warnings.warn("job_class argument for get_current_job is deprecated.", + DeprecationWarning) + return _job_stack.top class Job(object): @@ -553,11 +552,11 @@ class Job(object): def perform(self): # noqa """Invokes the job function with the job arguments.""" self.connection.persist(self.key) - _job_stack.push(self.id) + _job_stack.push(self) try: self._result = self._execute() finally: - assert self.id == _job_stack.pop() + assert self is _job_stack.pop() return self._result def _execute(self): @@ -607,7 +606,7 @@ class Job(object): self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) elif not ttl: return - else: + else: connection = pipeline if pipeline is not None else self.connection if ttl > 0: connection.expire(self.key, ttl) From 09697e567f18590e31f8faad3c2f71e7b2f41659 Mon Sep 17 00:00:00 2001 From: Alexey Katichev Date: Mon, 22 May 2017 14:31:22 +0300 Subject: [PATCH 3/3] revert back job.cleanup changes --- rq/job.py | 7 ++----- tests/test_queue.py | 5 +++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/rq/job.py b/rq/job.py index ea25269..b3800a5 100644 --- a/rq/job.py +++ b/rq/job.py @@ -606,12 +606,9 @@ class Job(object): self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) elif not ttl: return - else: + elif ttl > 0: connection = pipeline if pipeline is not None else self.connection - if ttl > 0: - connection.expire(self.key, ttl) - else: - connection.persist(self.key) + connection.expire(self.key, ttl) def register_dependency(self, pipeline=None): """Jobs may have dependencies. Jobs are enqueued only if the job they diff --git a/tests/test_queue.py b/tests/test_queue.py index 3eea3fb..7129452 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -654,9 +654,10 @@ class TestFailedQueue(RQTestCase): def test_job_in_failed_queue_persists(self): """Make sure failed job key does not expire""" q = Queue('foo') - job = q.enqueue(div_by_zero, args=(1, 2, 3), ttl=5) + job = q.enqueue(div_by_zero, args=(1,), ttl=5) self.assertEqual(self.testconn.ttl(job.key), 5) - + + self.assertRaises(ZeroDivisionError, job.perform) job.set_status(JobStatus.FAILED) failed_queue = get_failed_queue() failed_queue.quarantine(job, Exception('Some fake error'))