From ba0b39a43bc911e4889f52a1776036bba3d56dcd Mon Sep 17 00:00:00 2001 From: Malthe Borch <mborch@gmail.com> Date: Mon, 30 Dec 2013 17:40:04 +0100 Subject: [PATCH 1/5] Unpickle job data lazily. This fixes issue #294. --- CHANGES.md | 2 ++ rq/job.py | 49 ++++++++++++++++++++++++++++------------------- rq/queue.py | 10 +++------- rq/worker.py | 4 ---- tests/test_job.py | 9 ++++++--- 5 files changed, 40 insertions(+), 34 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 53d4c67..ae15bd4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ ### 0.4.0 (not released yet) +- Job data is unpickled lazily. This fixes issue #294. + - Removed dependency on the `times` library. Thanks, Malthe! - Job dependencies! Thanks, Selwin. diff --git a/rq/job.py b/rq/job.py index ebe38a0..1785ea5 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,5 +1,6 @@ import inspect from uuid import uuid4 +from functools import wraps try: from cPickle import loads, dumps, UnpicklingError except ImportError: # noqa @@ -62,9 +63,28 @@ def get_current_job(): return Job.fetch(job_id) +def lazy(f): + """Decorator for a lazy data property.""" + + attr = "_" + f.__name__ + + @wraps(f) + def decorator(job): + if job.data is not None: + job._func_name, job._instance, job._args, job._kwargs = unpickle(job.data) + del job.data + + return getattr(job, attr) + + return property(decorator) + + class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ + + data = None + # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, @@ -97,7 +117,7 @@ class Job(object): job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on return job - @property + @lazy def func_name(self): return self._func_name @@ -152,15 +172,15 @@ class Job(object): return import_attribute(self.func_name) - @property + @lazy def instance(self): return self._instance - @property + @lazy def args(self): return self._args - @property + @lazy def kwargs(self): return self._kwargs @@ -179,15 +199,6 @@ class Job(object): job.refresh() return job - @classmethod - def safe_fetch(cls, id, connection=None): - """Fetches a persisted job from its corresponding Redis key, but does - not instantiate it, making it impossible to get UnpickleErrors. - """ - job = cls(id, connection=connection) - job.refresh(safe=True) - return job - def __init__(self, id=None, connection=None): self.connection = resolve_connection(connection) self._id = id @@ -279,7 +290,7 @@ class Job(object): # Persistence - def refresh(self, safe=False): # noqa + def refresh(self): # noqa """Overwrite the current instance's properties with the values in the corresponding Redis key. @@ -301,11 +312,6 @@ class Job(object): except KeyError: raise NoSuchJobError('Unexpected job format: {0}'.format(obj)) - try: - self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data) - except UnpickleError: - if not safe: - raise self.created_at = to_date(as_text(obj.get('created_at'))) self.origin = as_text(obj.get('origin')) self.description = as_text(obj.get('description')) @@ -324,8 +330,11 @@ class Job(object): obj = {} obj['created_at'] = utcformat(self.created_at or utcnow()) - if self.func_name is not None: + if self.data is not None: + obj['data'] = self.data + elif self.func_name is not None: obj['data'] = dumps(self.job_tuple) + if self.origin is not None: obj['origin'] = self.origin if self.description is not None: diff --git a/rq/queue.py b/rq/queue.py index 4bedbee..d4cbb54 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -74,15 +74,11 @@ class Queue(object): """Returns whether the current queue is empty.""" return self.count == 0 - def safe_fetch_job(self, job_id): + def fetch_job(self, job_id): try: - job = Job.safe_fetch(job_id, connection=self.connection) + return Job.fetch(job_id, connection=self.connection) except NoSuchJobError: self.remove(job_id) - return None - except UnpickleError: - return None - return job def get_job_ids(self, offset=0, length=-1): """Returns a slice of job IDs in the queue.""" @@ -97,7 +93,7 @@ class Queue(object): def get_jobs(self, offset=0, length=-1): """Returns a slice of jobs in the queue.""" job_ids = self.get_job_ids(offset, length) - return compact([self.safe_fetch_job(job_id) for job_id in job_ids]) + return compact([self.fetch_job(job_id) for job_id in job_ids]) @property def job_ids(self): diff --git a/rq/worker.py b/rq/worker.py index 7fa8545..0321ca0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -308,10 +308,6 @@ class Worker(object): break except StopRequested: break - except UnpickleError as e: - job = Job.safe_fetch(e.job_id, connection=self.connection) - self.handle_exception(job, *sys.exc_info()) - continue self.state = 'busy' diff --git a/tests/test_job.py b/tests/test_job.py index 03d8155..f7252fa 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -170,15 +170,18 @@ class TestJob(RQTestCase): Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') def test_fetching_unreadable_data(self): - """Fetching fails on unreadable data.""" + """Fetching succeeds on unreadable data, but lazy props fail.""" # Set up job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job.save() # Just replace the data hkey with some random noise self.testconn.hset(job.key, 'data', 'this is no pickle string') - with self.assertRaises(UnpickleError): - job.refresh() + job.refresh() + + for attr in ('func_name', 'instance', 'args', 'kwargs'): + with self.assertRaises(UnpickleError): + getattr(job, attr) def test_job_is_unimportable(self): """Jobs that cannot be imported throw exception on access.""" From c898fe618ee0a2ff4453c232a7ae1e308cc2e6f0 Mon Sep 17 00:00:00 2001 From: Malthe Borch <mborch@gmail.com> Date: Fri, 3 Jan 2014 09:58:39 +0100 Subject: [PATCH 2/5] Move seemingly general purpose decorator into class scope, use and delete. Also, reduce repeat attribute declarations, at the cost of a requirement that decorated functions appear in the right order. --- rq/job.py | 44 +++++++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/rq/job.py b/rq/job.py index 1785ea5..79e5f2d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -63,27 +63,28 @@ def get_current_job(): return Job.fetch(job_id) -def lazy(f): - """Decorator for a lazy data property.""" - - attr = "_" + f.__name__ +class Job(object): + """A Job is just a convenient datastructure to pass around job (meta) data. + """ - @wraps(f) - def decorator(job): - if job.data is not None: - job._func_name, job._instance, job._args, job._kwargs = unpickle(job.data) - del job.data + data = None - return getattr(job, attr) + def lazy(f, _attrs=[]): + attr = "_" + f.__name__ + _attrs.append(attr) - return property(decorator) + @wraps(f) + def decorator(job): + if job.data is not None: + payload = unpickle(job.data) + for name, value in zip(_attrs, payload): + setattr(job, name, value) + del job.data -class Job(object): - """A Job is just a convenient datastructure to pass around job (meta) data. - """ + return getattr(job, attr) - data = None + return property(decorator) # Job construction @classmethod @@ -117,10 +118,6 @@ class Job(object): job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on return job - @lazy - def func_name(self): - return self._func_name - def _get_status(self): self._status = as_text(self.connection.hget(self.key, 'status')) return self._status @@ -172,6 +169,13 @@ class Job(object): return import_attribute(self.func_name) + # Note: The order in which the following lazy attributes are + # declared is important. Don't change! + + @lazy + def func_name(self): + return self._func_name + @lazy def instance(self): return self._instance @@ -184,6 +188,8 @@ class Job(object): def kwargs(self): return self._kwargs + del lazy + @classmethod def exists(cls, job_id, connection=None): """Returns whether a job hash exists for the given job ID.""" From ce3924c901a4d3ae579d5a84f8ea91225429c7c1 Mon Sep 17 00:00:00 2001 From: Malthe Borch <mborch@gmail.com> Date: Fri, 3 Jan 2014 10:21:37 +0100 Subject: [PATCH 3/5] Just use a regular private method along with 'property'. --- rq/job.py | 46 ++++++++++++++++------------------------------ 1 file changed, 16 insertions(+), 30 deletions(-) diff --git a/rq/job.py b/rq/job.py index 79e5f2d..98b0b4d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,6 +1,5 @@ import inspect from uuid import uuid4 -from functools import wraps try: from cPickle import loads, dumps, UnpicklingError except ImportError: # noqa @@ -69,23 +68,6 @@ class Job(object): data = None - def lazy(f, _attrs=[]): - attr = "_" + f.__name__ - _attrs.append(attr) - - @wraps(f) - def decorator(job): - if job.data is not None: - payload = unpickle(job.data) - for name, value in zip(_attrs, payload): - setattr(job, name, value) - - del job.data - - return getattr(job, attr) - - return property(decorator) - # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, @@ -169,26 +151,30 @@ class Job(object): return import_attribute(self.func_name) - # Note: The order in which the following lazy attributes are - # declared is important. Don't change! + def _get_lazy(self, name): + if self.data is not None: + self._func_name, self._instance, self._args, self._kwargs = \ + unpickle(self.data) + + del self.data - @lazy + return getattr(self, "_" + name) + + @property def func_name(self): - return self._func_name + return self._get_lazy('func_name') - @lazy + @property def instance(self): - return self._instance + return self._get_lazy('instance') - @lazy + @property def args(self): - return self._args + return self._get_lazy('args') - @lazy + @property def kwargs(self): - return self._kwargs - - del lazy + return self._get_lazy('kwargs') @classmethod def exists(cls, job_id, connection=None): From 66a554bc29b9655e88fc65d1df900c7f0b427f7b Mon Sep 17 00:00:00 2001 From: Vincent Driessen <vincent@3rdcloud.com> Date: Mon, 6 Jan 2014 23:27:58 +0100 Subject: [PATCH 4/5] Change AssertionError to proper TypeError. --- rq/job.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rq/job.py b/rq/job.py index 98b0b4d..c125189 100644 --- a/rq/job.py +++ b/rq/job.py @@ -79,8 +79,12 @@ class Job(object): args = () if kwargs is None: kwargs = {} - assert isinstance(args, (tuple, list)), '%r is not a valid args list.' % (args,) - assert isinstance(kwargs, dict), '%r is not a valid kwargs dict.' % (kwargs,) + + if not isinstance(args, (tuple, list)): + raise TypeError('{0!r} is not a valid args list.'.format(args)) + if not isinstance(kwargs, dict): + raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) + job = cls(connection=connection) if inspect.ismethod(func): job._instance = func.__self__ From 44233709eddb8aca0ec5904642be67a273b07f13 Mon Sep 17 00:00:00 2001 From: Vincent Driessen <vincent@3rdcloud.com> Date: Tue, 7 Jan 2014 00:22:13 +0100 Subject: [PATCH 5/5] Allow two-way setting of job data properties. As a side-effect, this also enables us to lazily load the data. --- rq/job.py | 107 ++++++++++++++++++++++++++++++++-------------- tests/test_job.py | 53 +++++++++++++++-------- 2 files changed, 110 insertions(+), 50 deletions(-) diff --git a/rq/job.py b/rq/job.py index c125189..8d6b9fc 100644 --- a/rq/job.py +++ b/rq/job.py @@ -19,6 +19,10 @@ Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed', STARTED='started') +# Sentinel value to mark that some of our lazily evaluated properties have not +# yet been evaluated. +UNEVALUATED = object() + def unpickle(pickled_string): """Unpickles a string, but raises a unified UnpickleError in case anything @@ -66,8 +70,6 @@ class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ - data = None - # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, @@ -86,6 +88,9 @@ class Job(object): raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) job = cls(connection=connection) + + # Set the core job tuple properties + job._instance = None if inspect.ismethod(func): job._instance = func.__self__ job._func_name = func.__name__ @@ -95,10 +100,13 @@ class Job(object): job._func_name = func job._args = args job._kwargs = kwargs + + # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl job.timeout = timeout job._status = status + # dependency could be job instance or id if depends_on is not None: job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on @@ -155,30 +163,79 @@ class Job(object): return import_attribute(self.func_name) - def _get_lazy(self, name): - if self.data is not None: - self._func_name, self._instance, self._args, self._kwargs = \ - unpickle(self.data) + def _unpickle_data(self): + self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data) + + @property + def data(self): + if self._data is UNEVALUATED: + if self._func_name is UNEVALUATED: + raise ValueError('Cannot build the job data.') + + if self._instance is UNEVALUATED: + self._instance = None + + if self._args is UNEVALUATED: + self._args = () + + if self._kwargs is UNEVALUATED: + self._kwargs = {} - del self.data + job_tuple = self._func_name, self._instance, self._args, self._kwargs + self._data = dumps(job_tuple) + return self._data - return getattr(self, "_" + name) + @data.setter + def data(self, value): + self._data = value + self._func_name = UNEVALUATED + self._instance = UNEVALUATED + self._args = UNEVALUATED + self._kwargs = UNEVALUATED @property def func_name(self): - return self._get_lazy('func_name') + if self._func_name is UNEVALUATED: + self._unpickle_data() + return self._func_name + + @func_name.setter + def func_name(self, value): + self._func_name = value + self._data = UNEVALUATED @property def instance(self): - return self._get_lazy('instance') + if self._instance is UNEVALUATED: + self._unpickle_data() + return self._instance + + @instance.setter + def instance(self, value): + self._instance = value + self._data = UNEVALUATED @property def args(self): - return self._get_lazy('args') + if self._args is UNEVALUATED: + self._unpickle_data() + return self._args + + @args.setter + def args(self, value): + self._args = value + self._data = UNEVALUATED @property def kwargs(self): - return self._get_lazy('kwargs') + if self._kwargs is UNEVALUATED: + self._unpickle_data() + return self._kwargs + + @kwargs.setter + def kwargs(self, value): + self._kwargs = value + self._data = UNEVALUATED @classmethod def exists(cls, job_id, connection=None): @@ -199,10 +256,11 @@ class Job(object): self.connection = resolve_connection(connection) self._id = id self.created_at = utcnow() - self._func_name = None - self._instance = None - self._args = None - self._kwargs = None + self._data = UNEVALUATED + self._func_name = UNEVALUATED + self._instance = UNEVALUATED + self._args = UNEVALUATED + self._kwargs = UNEVALUATED self.description = None self.origin = None self.enqueued_at = None @@ -215,7 +273,6 @@ class Job(object): self._dependency_id = None self.meta = {} - # Data access def get_id(self): # noqa """The job ID for this job instance. Generates an ID lazily the @@ -251,12 +308,6 @@ class Job(object): """The Redis key that is used to store job hash under.""" return self.dependents_key_for(self.id) - @property # noqa - def job_tuple(self): - """Returns the job tuple that encodes the actual function call that - this job represents.""" - return (self.func_name, self.instance, self.args, self.kwargs) - @property def result(self): """Returns the return value of the job. @@ -284,7 +335,6 @@ class Job(object): """Backwards-compatibility accessor property `return_value`.""" return_value = result - # Persistence def refresh(self): # noqa """Overwrite the current instance's properties with the values in the @@ -316,7 +366,7 @@ class Job(object): self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None - self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) self._dependency_id = as_text(obj.get('dependency_id', None)) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} @@ -325,11 +375,7 @@ class Job(object): """Returns a serialization of the current job instance""" obj = {} obj['created_at'] = utcformat(self.created_at or utcnow()) - - if self.data is not None: - obj['data'] = self.data - elif self.func_name is not None: - obj['data'] = dumps(self.job_tuple) + obj['data'] = self.data if self.origin is not None: obj['origin'] = self.origin @@ -443,7 +489,6 @@ class Job(object): def __str__(self): return '<Job %s: %s>' % (self.id, self.description) - # Job equality def __eq__(self, other): # noqa return self.id == other.id diff --git a/tests/test_job.py b/tests/test_job.py index f7252fa..06c549c 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -3,9 +3,9 @@ from tests import RQTestCase from tests.fixtures import Number, some_calculation, say_hello, access_self from tests.helpers import strip_microseconds try: - from cPickle import loads + from cPickle import loads, dumps except ImportError: - from pickle import loads + from pickle import loads, dumps from rq.compat import as_text from rq.job import Job, get_current_job from rq.exceptions import NoSuchJobError, UnpickleError @@ -23,16 +23,21 @@ class TestJob(RQTestCase): self.assertIsNotNone(job.created_at) # ...and nothing else - self.assertIsNone(job.func) - self.assertIsNone(job.instance) - self.assertIsNone(job.args) - self.assertIsNone(job.kwargs) self.assertIsNone(job.origin) self.assertIsNone(job.enqueued_at) self.assertIsNone(job.ended_at) self.assertIsNone(job.result) self.assertIsNone(job.exc_info) + with self.assertRaises(ValueError): + self.assertIsNone(job.func) + with self.assertRaises(ValueError): + self.assertIsNone(job.instance) + with self.assertRaises(ValueError): + self.assertIsNone(job.args) + with self.assertRaises(ValueError): + self.assertIsNone(job.kwargs) + def test_create_typical_job(self): """Creation of jobs for function calls.""" job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) @@ -72,6 +77,27 @@ class TestJob(RQTestCase): self.assertIsNone(job.instance) self.assertEquals(job.args, ('World',)) + def test_job_properties_set_data_property(self): + """Data property gets derived from the job tuple.""" + job = Job() + job.func_name = 'foo' + fname, instance, args, kwargs = loads(job.data) + + self.assertEquals(fname, job.func_name) + self.assertEquals(instance, None) + self.assertEquals(args, ()) + self.assertEquals(kwargs, {}) + + def test_data_property(self): + """Job tuple gets derived lazily from data property.""" + job = Job() + job.data = dumps(('foo', None, (1, 2, 3), {'bar': 'qux'})) + + self.assertEquals(job.func_name, 'foo') + self.assertEquals(job.instance, None) + self.assertEquals(job.args, (1, 2, 3)) + self.assertEquals(job.kwargs, {'bar': 'qux'}) + def test_save(self): # noqa """Storing jobs.""" job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) @@ -102,22 +128,11 @@ class TestJob(RQTestCase): self.assertEquals(job.kwargs, dict(z=2)) self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) - def test_persistence_of_empty_jobs(self): # noqa """Storing empty jobs.""" job = Job() - job.save() - - expected_date = strip_microseconds(job.created_at) - stored_date = self.testconn.hget(job.key, 'created_at').decode('utf-8') - self.assertEquals( - stored_date, - utcformat(expected_date)) - - # ... and no other keys are stored - self.assertEqual( - self.testconn.hkeys(job.key), - [b'created_at']) + with self.assertRaises(ValueError): + job.save() def test_persistence_of_typical_jobs(self): """Storing typical jobs."""