From 56c89c1334e9df043035cb68a930fb634f7c1d33 Mon Sep 17 00:00:00 2001 From: Benjamin Root Date: Fri, 6 Jan 2017 17:17:42 -0500 Subject: [PATCH 1/4] Trigger a refresh after job execution * Prevents clobbering of user-supplied metadata modified during the job execution --- rq/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 4549b55..fb7047a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -705,6 +705,7 @@ class Worker(object): try: with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() + job.refresh() job.ended_at = utcnow() @@ -718,6 +719,7 @@ class Worker(object): started_job_registry=started_job_registry ) except Exception: + job.refresh() self.handle_job_failure( job=job, started_job_registry=started_job_registry From f68aeff4810a7e87a16351f570ca5ec5c4e72041 Mon Sep 17 00:00:00 2001 From: Benjamin Root Date: Mon, 9 Jan 2017 11:49:36 -0500 Subject: [PATCH 2/4] Added integration test for the metadata persistence feature --- tests/fixtures.py | 6 ++++++ tests/test_worker.py | 22 +++++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index d040b9d..df946c4 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -61,6 +61,12 @@ def access_self(): assert get_current_job() is not None +def modify_self(meta): + j = get_current_job() + j.meta.update(meta) + j.save() + + def echo(*args, **kwargs): return (args, kwargs) diff --git a/tests/test_worker.py b/tests/test_worker.py index 4449aa8..10942bf 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -19,7 +19,8 @@ import mock from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, - run_dummy_heroku_worker, access_self) + run_dummy_heroku_worker, access_self, + modify_self) from tests.helpers import strip_microseconds from rq import (get_failed_queue, Queue, SimpleWorker, Worker, @@ -606,6 +607,25 @@ class TestWorker(RQTestCase): # So before that fix the call count was 4 instead of 3 self.assertEqual(mocked.call_count, 3) + def test_self_modification_persistence(self): + """Make sure that any meta modification done by + the job itself persists completely through the + queue/worker/job stack.""" + q = Queue() + # Also make sure that previously existing metadata + # persists properly + job = q.enqueue(modify_self, meta={'foo': 'bar', 'baz': 42}, + args=[{'baz': 10, 'newinfo': 'waka'}]) + + w = Worker([q]) + w.work(burst=True) + + job_check = Job.fetch(job.id) + self.assertEqual(set(job_check.meta.keys()), {'foo', 'baz', 'newinfo'}) + self.assertEqual(job_check.meta['foo'], 'bar') + self.assertEqual(job_check.meta['baz'], 10) + self.assertEqual(job_check.meta['newinfo'], 'waka') + def kill_worker(pid, double_kill): # wait for the worker to be started over on the main process From 30a7ab48991d430f2acbadfb10a2d71c64c73967 Mon Sep 17 00:00:00 2001 From: Benjamin Root Date: Mon, 9 Jan 2017 12:06:57 -0500 Subject: [PATCH 3/4] Add similar test for when the job fails --- tests/fixtures.py | 7 +++++++ tests/test_worker.py | 31 +++++++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index df946c4..031a0e2 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -67,6 +67,13 @@ def modify_self(meta): j.save() +def modify_self_and_error(meta): + j = get_current_job() + j.meta.update(meta) + j.save() + return 1 / 0 + + def echo(*args, **kwargs): return (args, kwargs) diff --git a/tests/test_worker.py b/tests/test_worker.py index 10942bf..885de29 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -20,7 +20,7 @@ from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, run_dummy_heroku_worker, access_self, - modify_self) + modify_self, modify_self_and_error) from tests.helpers import strip_microseconds from rq import (get_failed_queue, Queue, SimpleWorker, Worker, @@ -621,7 +621,34 @@ class TestWorker(RQTestCase): w.work(burst=True) job_check = Job.fetch(job.id) - self.assertEqual(set(job_check.meta.keys()), {'foo', 'baz', 'newinfo'}) + self.assertEqual(set(job_check.meta.keys()), + set(['foo', 'baz', 'newinfo'])) + self.assertEqual(job_check.meta['foo'], 'bar') + self.assertEqual(job_check.meta['baz'], 10) + self.assertEqual(job_check.meta['newinfo'], 'waka') + + def test_self_modification_persistence_with_error(self): + """Make sure that any meta modification done by + the job itself persists completely through the + queue/worker/job stack -- even if the job errored""" + q = Queue() + failed_q = get_failed_queue() + # Also make sure that previously existing metadata + # persists properly + job = q.enqueue(modify_self_and_error, meta={'foo': 'bar', 'baz': 42}, + args=[{'baz': 10, 'newinfo': 'waka'}]) + + w = Worker([q]) + w.work(burst=True) + + # Postconditions + self.assertEqual(q.count, 0) + self.assertEqual(failed_q.count, 1) + self.assertEqual(w.get_current_job_id(), None) + + job_check = Job.fetch(job.id) + self.assertEqual(set(job_check.meta.keys()), + set(['foo', 'baz', 'newinfo'])) self.assertEqual(job_check.meta['foo'], 'bar') self.assertEqual(job_check.meta['baz'], 10) self.assertEqual(job_check.meta['newinfo'], 'waka') From efb30b45a36d66cc5b7948f6f1fb658378b9f57a Mon Sep 17 00:00:00 2001 From: Benjamin Root Date: Fri, 27 Jan 2017 13:37:30 -0500 Subject: [PATCH 4/4] Take a different tack, and enable the exclusion of meta from saves --- rq/job.py | 25 +++++++++++++++++++------ rq/queue.py | 4 ++-- rq/registry.py | 2 +- rq/worker.py | 5 ++--- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/rq/job.py b/rq/job.py index b820f79..c6e41a8 100644 --- a/rq/job.py +++ b/rq/job.py @@ -422,8 +422,14 @@ class Job(object): self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} - def to_dict(self): - """Returns a serialization of the current job instance""" + def to_dict(self, include_meta=True): + """ + Returns a serialization of the current job instance + + You can exclude serializing the `meta` dictionary by setting + `include_meta=False`. + + """ obj = {} obj['created_at'] = utcformat(self.created_at or utcnow()) obj['data'] = self.data @@ -450,19 +456,26 @@ class Job(object): obj['status'] = self._status if self._dependency_id is not None: obj['dependency_id'] = self._dependency_id - if self.meta: + if self.meta and include_meta: obj['meta'] = dumps(self.meta) if self.ttl: obj['ttl'] = self.ttl return obj - def save(self, pipeline=None): - """Persists the current job instance to its corresponding Redis key.""" + def save(self, pipeline=None, include_meta=True): + """ + Persists the current job instance to its corresponding Redis key. + + Exclude persisting the `meta` dictionary by setting + `include_meta=False`. This is useful to prevent clobbering + user metadata without an expensive `refresh()` call first. + + """ key = self.key connection = pipeline if pipeline is not None else self.connection - connection.hmset(key, self.to_dict()) + connection.hmset(key, self.to_dict(include_meta=include_meta)) self.cleanup(self.ttl, pipeline=connection) def cancel(self): diff --git a/rq/queue.py b/rq/queue.py index 417141a..3bba95e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -229,7 +229,7 @@ class Queue(object): if not self._async: job.perform() job.set_status(JobStatus.FINISHED) - job.save() + job.save(include_meta=False) job.cleanup(DEFAULT_RESULT_TTL) return job @@ -474,7 +474,7 @@ class FailedQueue(Queue): job.ended_at = utcnow() job.exc_info = exc_info - job.save(pipeline=pipeline) + job.save(pipeline=pipeline, include_meta=False) self.push_job_id(job.id, pipeline=pipeline) pipeline.execute() diff --git a/rq/registry.py b/rq/registry.py index 65b9796..7a63cf2 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -89,7 +89,7 @@ class StartedJobRegistry(BaseRegistry): try: job = Job.fetch(job_id, connection=self.connection) job.set_status(JobStatus.FAILED) - job.save(pipeline=pipeline) + job.save(pipeline=pipeline, include_meta=False) failed_queue.push_job_id(job_id, pipeline=pipeline) except NoSuchJobError: pass diff --git a/rq/worker.py b/rq/worker.py index fb7047a..7cb6069 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -677,7 +677,8 @@ class Worker(object): result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.set_status(JobStatus.FINISHED, pipeline=pipeline) - job.save(pipeline=pipeline) + # Don't clobber the user's meta dictionary! + job.save(pipeline=pipeline, include_meta=False) finished_job_registry = FinishedJobRegistry(job.origin, self.connection) @@ -705,7 +706,6 @@ class Worker(object): try: with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() - job.refresh() job.ended_at = utcnow() @@ -719,7 +719,6 @@ class Worker(object): started_job_registry=started_job_registry ) except Exception: - job.refresh() self.handle_job_failure( job=job, started_job_registry=started_job_registry