From 44233709eddb8aca0ec5904642be67a273b07f13 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 7 Jan 2014 00:22:13 +0100 Subject: [PATCH] Allow two-way setting of job data properties. As a side-effect, this also enables us to lazily load the data. --- rq/job.py | 107 ++++++++++++++++++++++++++++++++-------------- tests/test_job.py | 53 +++++++++++++++-------- 2 files changed, 110 insertions(+), 50 deletions(-) diff --git a/rq/job.py b/rq/job.py index c125189..8d6b9fc 100644 --- a/rq/job.py +++ b/rq/job.py @@ -19,6 +19,10 @@ Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed', STARTED='started') +# Sentinel value to mark that some of our lazily evaluated properties have not +# yet been evaluated. +UNEVALUATED = object() + def unpickle(pickled_string): """Unpickles a string, but raises a unified UnpickleError in case anything @@ -66,8 +70,6 @@ class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ - data = None - # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, @@ -86,6 +88,9 @@ class Job(object): raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs)) job = cls(connection=connection) + + # Set the core job tuple properties + job._instance = None if inspect.ismethod(func): job._instance = func.__self__ job._func_name = func.__name__ @@ -95,10 +100,13 @@ class Job(object): job._func_name = func job._args = args job._kwargs = kwargs + + # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl job.timeout = timeout job._status = status + # dependency could be job instance or id if depends_on is not None: job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on @@ -155,30 +163,79 @@ class Job(object): return import_attribute(self.func_name) - def _get_lazy(self, name): - if self.data is not None: - self._func_name, self._instance, self._args, self._kwargs = \ - unpickle(self.data) + def _unpickle_data(self): + self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data) + + @property + def data(self): + if self._data is UNEVALUATED: + if self._func_name is UNEVALUATED: + raise ValueError('Cannot build the job data.') + + if self._instance is UNEVALUATED: + self._instance = None + + if self._args is UNEVALUATED: + self._args = () + + if self._kwargs is UNEVALUATED: + self._kwargs = {} - del self.data + job_tuple = self._func_name, self._instance, self._args, self._kwargs + self._data = dumps(job_tuple) + return self._data - return getattr(self, "_" + name) + @data.setter + def data(self, value): + self._data = value + self._func_name = UNEVALUATED + self._instance = UNEVALUATED + self._args = UNEVALUATED + self._kwargs = UNEVALUATED @property def func_name(self): - return self._get_lazy('func_name') + if self._func_name is UNEVALUATED: + self._unpickle_data() + return self._func_name + + @func_name.setter + def func_name(self, value): + self._func_name = value + self._data = UNEVALUATED @property def instance(self): - return self._get_lazy('instance') + if self._instance is UNEVALUATED: + self._unpickle_data() + return self._instance + + @instance.setter + def instance(self, value): + self._instance = value + self._data = UNEVALUATED @property def args(self): - return self._get_lazy('args') + if self._args is UNEVALUATED: + self._unpickle_data() + return self._args + + @args.setter + def args(self, value): + self._args = value + self._data = UNEVALUATED @property def kwargs(self): - return self._get_lazy('kwargs') + if self._kwargs is UNEVALUATED: + self._unpickle_data() + return self._kwargs + + @kwargs.setter + def kwargs(self, value): + self._kwargs = value + self._data = UNEVALUATED @classmethod def exists(cls, job_id, connection=None): @@ -199,10 +256,11 @@ class Job(object): self.connection = resolve_connection(connection) self._id = id self.created_at = utcnow() - self._func_name = None - self._instance = None - self._args = None - self._kwargs = None + self._data = UNEVALUATED + self._func_name = UNEVALUATED + self._instance = UNEVALUATED + self._args = UNEVALUATED + self._kwargs = UNEVALUATED self.description = None self.origin = None self.enqueued_at = None @@ -215,7 +273,6 @@ class Job(object): self._dependency_id = None self.meta = {} - # Data access def get_id(self): # noqa """The job ID for this job instance. Generates an ID lazily the @@ -251,12 +308,6 @@ class Job(object): """The Redis key that is used to store job hash under.""" return self.dependents_key_for(self.id) - @property # noqa - def job_tuple(self): - """Returns the job tuple that encodes the actual function call that - this job represents.""" - return (self.func_name, self.instance, self.args, self.kwargs) - @property def result(self): """Returns the return value of the job. @@ -284,7 +335,6 @@ class Job(object): """Backwards-compatibility accessor property `return_value`.""" return_value = result - # Persistence def refresh(self): # noqa """Overwrite the current instance's properties with the values in the @@ -316,7 +366,7 @@ class Job(object): self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None - self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) self._dependency_id = as_text(obj.get('dependency_id', None)) self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} @@ -325,11 +375,7 @@ class Job(object): """Returns a serialization of the current job instance""" obj = {} obj['created_at'] = utcformat(self.created_at or utcnow()) - - if self.data is not None: - obj['data'] = self.data - elif self.func_name is not None: - obj['data'] = dumps(self.job_tuple) + obj['data'] = self.data if self.origin is not None: obj['origin'] = self.origin @@ -443,7 +489,6 @@ class Job(object): def __str__(self): return '' % (self.id, self.description) - # Job equality def __eq__(self, other): # noqa return self.id == other.id diff --git a/tests/test_job.py b/tests/test_job.py index f7252fa..06c549c 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -3,9 +3,9 @@ from tests import RQTestCase from tests.fixtures import Number, some_calculation, say_hello, access_self from tests.helpers import strip_microseconds try: - from cPickle import loads + from cPickle import loads, dumps except ImportError: - from pickle import loads + from pickle import loads, dumps from rq.compat import as_text from rq.job import Job, get_current_job from rq.exceptions import NoSuchJobError, UnpickleError @@ -23,16 +23,21 @@ class TestJob(RQTestCase): self.assertIsNotNone(job.created_at) # ...and nothing else - self.assertIsNone(job.func) - self.assertIsNone(job.instance) - self.assertIsNone(job.args) - self.assertIsNone(job.kwargs) self.assertIsNone(job.origin) self.assertIsNone(job.enqueued_at) self.assertIsNone(job.ended_at) self.assertIsNone(job.result) self.assertIsNone(job.exc_info) + with self.assertRaises(ValueError): + self.assertIsNone(job.func) + with self.assertRaises(ValueError): + self.assertIsNone(job.instance) + with self.assertRaises(ValueError): + self.assertIsNone(job.args) + with self.assertRaises(ValueError): + self.assertIsNone(job.kwargs) + def test_create_typical_job(self): """Creation of jobs for function calls.""" job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) @@ -72,6 +77,27 @@ class TestJob(RQTestCase): self.assertIsNone(job.instance) self.assertEquals(job.args, ('World',)) + def test_job_properties_set_data_property(self): + """Data property gets derived from the job tuple.""" + job = Job() + job.func_name = 'foo' + fname, instance, args, kwargs = loads(job.data) + + self.assertEquals(fname, job.func_name) + self.assertEquals(instance, None) + self.assertEquals(args, ()) + self.assertEquals(kwargs, {}) + + def test_data_property(self): + """Job tuple gets derived lazily from data property.""" + job = Job() + job.data = dumps(('foo', None, (1, 2, 3), {'bar': 'qux'})) + + self.assertEquals(job.func_name, 'foo') + self.assertEquals(job.instance, None) + self.assertEquals(job.args, (1, 2, 3)) + self.assertEquals(job.kwargs, {'bar': 'qux'}) + def test_save(self): # noqa """Storing jobs.""" job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) @@ -102,22 +128,11 @@ class TestJob(RQTestCase): self.assertEquals(job.kwargs, dict(z=2)) self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) - def test_persistence_of_empty_jobs(self): # noqa """Storing empty jobs.""" job = Job() - job.save() - - expected_date = strip_microseconds(job.created_at) - stored_date = self.testconn.hget(job.key, 'created_at').decode('utf-8') - self.assertEquals( - stored_date, - utcformat(expected_date)) - - # ... and no other keys are stored - self.assertEqual( - self.testconn.hkeys(job.key), - [b'created_at']) + with self.assertRaises(ValueError): + job.save() def test_persistence_of_typical_jobs(self): """Storing typical jobs."""