Allow two-way setting of job data properties.

As a side-effect, this also enables us to lazily load the data.
main
Vincent Driessen 11 years ago
parent 66a554bc29
commit 44233709ed

@ -19,6 +19,10 @@ Status = enum('Status',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')
# Sentinel value to mark that some of our lazily evaluated properties have not
# yet been evaluated.
UNEVALUATED = object()
def unpickle(pickled_string):
"""Unpickles a string, but raises a unified UnpickleError in case anything
@ -66,8 +70,6 @@ class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data.
"""
data = None
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
@ -86,6 +88,9 @@ class Job(object):
raise TypeError('{0!r} is not a valid kwargs dict.'.format(kwargs))
job = cls(connection=connection)
# Set the core job tuple properties
job._instance = None
if inspect.ismethod(func):
job._instance = func.__self__
job._func_name = func.__name__
@ -95,10 +100,13 @@ class Job(object):
job._func_name = func
job._args = args
job._kwargs = kwargs
# Extra meta data
job.description = description or job.get_call_string()
job.result_ttl = result_ttl
job.timeout = timeout
job._status = status
# dependency could be job instance or id
if depends_on is not None:
job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on
@ -155,30 +163,79 @@ class Job(object):
return import_attribute(self.func_name)
def _get_lazy(self, name):
if self.data is not None:
self._func_name, self._instance, self._args, self._kwargs = \
unpickle(self.data)
def _unpickle_data(self):
self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data)
@property
def data(self):
if self._data is UNEVALUATED:
if self._func_name is UNEVALUATED:
raise ValueError('Cannot build the job data.')
if self._instance is UNEVALUATED:
self._instance = None
if self._args is UNEVALUATED:
self._args = ()
if self._kwargs is UNEVALUATED:
self._kwargs = {}
del self.data
job_tuple = self._func_name, self._instance, self._args, self._kwargs
self._data = dumps(job_tuple)
return self._data
return getattr(self, "_" + name)
@data.setter
def data(self, value):
self._data = value
self._func_name = UNEVALUATED
self._instance = UNEVALUATED
self._args = UNEVALUATED
self._kwargs = UNEVALUATED
@property
def func_name(self):
return self._get_lazy('func_name')
if self._func_name is UNEVALUATED:
self._unpickle_data()
return self._func_name
@func_name.setter
def func_name(self, value):
self._func_name = value
self._data = UNEVALUATED
@property
def instance(self):
return self._get_lazy('instance')
if self._instance is UNEVALUATED:
self._unpickle_data()
return self._instance
@instance.setter
def instance(self, value):
self._instance = value
self._data = UNEVALUATED
@property
def args(self):
return self._get_lazy('args')
if self._args is UNEVALUATED:
self._unpickle_data()
return self._args
@args.setter
def args(self, value):
self._args = value
self._data = UNEVALUATED
@property
def kwargs(self):
return self._get_lazy('kwargs')
if self._kwargs is UNEVALUATED:
self._unpickle_data()
return self._kwargs
@kwargs.setter
def kwargs(self, value):
self._kwargs = value
self._data = UNEVALUATED
@classmethod
def exists(cls, job_id, connection=None):
@ -199,10 +256,11 @@ class Job(object):
self.connection = resolve_connection(connection)
self._id = id
self.created_at = utcnow()
self._func_name = None
self._instance = None
self._args = None
self._kwargs = None
self._data = UNEVALUATED
self._func_name = UNEVALUATED
self._instance = UNEVALUATED
self._args = UNEVALUATED
self._kwargs = UNEVALUATED
self.description = None
self.origin = None
self.enqueued_at = None
@ -215,7 +273,6 @@ class Job(object):
self._dependency_id = None
self.meta = {}
# Data access
def get_id(self): # noqa
"""The job ID for this job instance. Generates an ID lazily the
@ -251,12 +308,6 @@ class Job(object):
"""The Redis key that is used to store job hash under."""
return self.dependents_key_for(self.id)
@property # noqa
def job_tuple(self):
"""Returns the job tuple that encodes the actual function call that
this job represents."""
return (self.func_name, self.instance, self.args, self.kwargs)
@property
def result(self):
"""Returns the return value of the job.
@ -284,7 +335,6 @@ class Job(object):
"""Backwards-compatibility accessor property `return_value`."""
return_value = result
# Persistence
def refresh(self): # noqa
"""Overwrite the current instance's properties with the values in the
@ -316,7 +366,7 @@ class Job(object):
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.exc_info = obj.get('exc_info')
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = as_text(obj.get('dependency_id', None))
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
@ -325,11 +375,7 @@ class Job(object):
"""Returns a serialization of the current job instance"""
obj = {}
obj['created_at'] = utcformat(self.created_at or utcnow())
if self.data is not None:
obj['data'] = self.data
elif self.func_name is not None:
obj['data'] = dumps(self.job_tuple)
obj['data'] = self.data
if self.origin is not None:
obj['origin'] = self.origin
@ -443,7 +489,6 @@ class Job(object):
def __str__(self):
return '<Job %s: %s>' % (self.id, self.description)
# Job equality
def __eq__(self, other): # noqa
return self.id == other.id

@ -3,9 +3,9 @@ from tests import RQTestCase
from tests.fixtures import Number, some_calculation, say_hello, access_self
from tests.helpers import strip_microseconds
try:
from cPickle import loads
from cPickle import loads, dumps
except ImportError:
from pickle import loads
from pickle import loads, dumps
from rq.compat import as_text
from rq.job import Job, get_current_job
from rq.exceptions import NoSuchJobError, UnpickleError
@ -23,16 +23,21 @@ class TestJob(RQTestCase):
self.assertIsNotNone(job.created_at)
# ...and nothing else
self.assertIsNone(job.func)
self.assertIsNone(job.instance)
self.assertIsNone(job.args)
self.assertIsNone(job.kwargs)
self.assertIsNone(job.origin)
self.assertIsNone(job.enqueued_at)
self.assertIsNone(job.ended_at)
self.assertIsNone(job.result)
self.assertIsNone(job.exc_info)
with self.assertRaises(ValueError):
self.assertIsNone(job.func)
with self.assertRaises(ValueError):
self.assertIsNone(job.instance)
with self.assertRaises(ValueError):
self.assertIsNone(job.args)
with self.assertRaises(ValueError):
self.assertIsNone(job.kwargs)
def test_create_typical_job(self):
"""Creation of jobs for function calls."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
@ -72,6 +77,27 @@ class TestJob(RQTestCase):
self.assertIsNone(job.instance)
self.assertEquals(job.args, ('World',))
def test_job_properties_set_data_property(self):
"""Data property gets derived from the job tuple."""
job = Job()
job.func_name = 'foo'
fname, instance, args, kwargs = loads(job.data)
self.assertEquals(fname, job.func_name)
self.assertEquals(instance, None)
self.assertEquals(args, ())
self.assertEquals(kwargs, {})
def test_data_property(self):
"""Job tuple gets derived lazily from data property."""
job = Job()
job.data = dumps(('foo', None, (1, 2, 3), {'bar': 'qux'}))
self.assertEquals(job.func_name, 'foo')
self.assertEquals(job.instance, None)
self.assertEquals(job.args, (1, 2, 3))
self.assertEquals(job.kwargs, {'bar': 'qux'})
def test_save(self): # noqa
"""Storing jobs."""
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
@ -102,22 +128,11 @@ class TestJob(RQTestCase):
self.assertEquals(job.kwargs, dict(z=2))
self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24))
def test_persistence_of_empty_jobs(self): # noqa
"""Storing empty jobs."""
job = Job()
job.save()
expected_date = strip_microseconds(job.created_at)
stored_date = self.testconn.hget(job.key, 'created_at').decode('utf-8')
self.assertEquals(
stored_date,
utcformat(expected_date))
# ... and no other keys are stored
self.assertEqual(
self.testconn.hkeys(job.key),
[b'created_at'])
with self.assertRaises(ValueError):
job.save()
def test_persistence_of_typical_jobs(self):
"""Storing typical jobs."""

Loading…
Cancel
Save