diff --git a/CHANGES.md b/CHANGES.md index 53d4c67..ae15bd4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ ### 0.4.0 (not released yet) +- Job data is unpickled lazily. This fixes issue #294. + - Removed dependency on the `times` library. Thanks, Malthe! - Job dependencies! Thanks, Selwin. diff --git a/rq/job.py b/rq/job.py index ebe38a0..1785ea5 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,5 +1,6 @@ import inspect from uuid import uuid4 +from functools import wraps try: from cPickle import loads, dumps, UnpicklingError except ImportError: # noqa @@ -62,9 +63,28 @@ def get_current_job(): return Job.fetch(job_id) +def lazy(f): + """Decorator for a lazy data property.""" + + attr = "_" + f.__name__ + + @wraps(f) + def decorator(job): + if job.data is not None: + job._func_name, job._instance, job._args, job._kwargs = unpickle(job.data) + del job.data + + return getattr(job, attr) + + return property(decorator) + + class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ + + data = None + # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, @@ -97,7 +117,7 @@ class Job(object): job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on return job - @property + @lazy def func_name(self): return self._func_name @@ -152,15 +172,15 @@ class Job(object): return import_attribute(self.func_name) - @property + @lazy def instance(self): return self._instance - @property + @lazy def args(self): return self._args - @property + @lazy def kwargs(self): return self._kwargs @@ -179,15 +199,6 @@ class Job(object): job.refresh() return job - @classmethod - def safe_fetch(cls, id, connection=None): - """Fetches a persisted job from its corresponding Redis key, but does - not instantiate it, making it impossible to get UnpickleErrors. - """ - job = cls(id, connection=connection) - job.refresh(safe=True) - return job - def __init__(self, id=None, connection=None): self.connection = resolve_connection(connection) self._id = id @@ -279,7 +290,7 @@ class Job(object): # Persistence - def refresh(self, safe=False): # noqa + def refresh(self): # noqa """Overwrite the current instance's properties with the values in the corresponding Redis key. @@ -301,11 +312,6 @@ class Job(object): except KeyError: raise NoSuchJobError('Unexpected job format: {0}'.format(obj)) - try: - self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data) - except UnpickleError: - if not safe: - raise self.created_at = to_date(as_text(obj.get('created_at'))) self.origin = as_text(obj.get('origin')) self.description = as_text(obj.get('description')) @@ -324,8 +330,11 @@ class Job(object): obj = {} obj['created_at'] = utcformat(self.created_at or utcnow()) - if self.func_name is not None: + if self.data is not None: + obj['data'] = self.data + elif self.func_name is not None: obj['data'] = dumps(self.job_tuple) + if self.origin is not None: obj['origin'] = self.origin if self.description is not None: diff --git a/rq/queue.py b/rq/queue.py index 4bedbee..d4cbb54 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -74,15 +74,11 @@ class Queue(object): """Returns whether the current queue is empty.""" return self.count == 0 - def safe_fetch_job(self, job_id): + def fetch_job(self, job_id): try: - job = Job.safe_fetch(job_id, connection=self.connection) + return Job.fetch(job_id, connection=self.connection) except NoSuchJobError: self.remove(job_id) - return None - except UnpickleError: - return None - return job def get_job_ids(self, offset=0, length=-1): """Returns a slice of job IDs in the queue.""" @@ -97,7 +93,7 @@ class Queue(object): def get_jobs(self, offset=0, length=-1): """Returns a slice of jobs in the queue.""" job_ids = self.get_job_ids(offset, length) - return compact([self.safe_fetch_job(job_id) for job_id in job_ids]) + return compact([self.fetch_job(job_id) for job_id in job_ids]) @property def job_ids(self): diff --git a/rq/worker.py b/rq/worker.py index 7fa8545..0321ca0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -308,10 +308,6 @@ class Worker(object): break except StopRequested: break - except UnpickleError as e: - job = Job.safe_fetch(e.job_id, connection=self.connection) - self.handle_exception(job, *sys.exc_info()) - continue self.state = 'busy' diff --git a/tests/test_job.py b/tests/test_job.py index 03d8155..f7252fa 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -170,15 +170,18 @@ class TestJob(RQTestCase): Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') def test_fetching_unreadable_data(self): - """Fetching fails on unreadable data.""" + """Fetching succeeds on unreadable data, but lazy props fail.""" # Set up job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job.save() # Just replace the data hkey with some random noise self.testconn.hset(job.key, 'data', 'this is no pickle string') - with self.assertRaises(UnpickleError): - job.refresh() + job.refresh() + + for attr in ('func_name', 'instance', 'args', 'kwargs'): + with self.assertRaises(UnpickleError): + getattr(job, attr) def test_job_is_unimportable(self): """Jobs that cannot be imported throw exception on access."""