Unpickle job data lazily. This fixes issue #294.

main
Malthe Borch 11 years ago
parent 19e58027ac
commit ba0b39a43b

@ -1,6 +1,8 @@
### 0.4.0 ### 0.4.0
(not released yet) (not released yet)
- Job data is unpickled lazily. This fixes issue #294.
- Removed dependency on the `times` library. Thanks, Malthe! - Removed dependency on the `times` library. Thanks, Malthe!
- Job dependencies! Thanks, Selwin. - Job dependencies! Thanks, Selwin.

@ -1,5 +1,6 @@
import inspect import inspect
from uuid import uuid4 from uuid import uuid4
from functools import wraps
try: try:
from cPickle import loads, dumps, UnpicklingError from cPickle import loads, dumps, UnpicklingError
except ImportError: # noqa except ImportError: # noqa
@ -62,9 +63,28 @@ def get_current_job():
return Job.fetch(job_id) return Job.fetch(job_id)
def lazy(f):
"""Decorator for a lazy data property."""
attr = "_" + f.__name__
@wraps(f)
def decorator(job):
if job.data is not None:
job._func_name, job._instance, job._args, job._kwargs = unpickle(job.data)
del job.data
return getattr(job, attr)
return property(decorator)
class Job(object): class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data. """A Job is just a convenient datastructure to pass around job (meta) data.
""" """
data = None
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
@ -97,7 +117,7 @@ class Job(object):
job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on
return job return job
@property @lazy
def func_name(self): def func_name(self):
return self._func_name return self._func_name
@ -152,15 +172,15 @@ class Job(object):
return import_attribute(self.func_name) return import_attribute(self.func_name)
@property @lazy
def instance(self): def instance(self):
return self._instance return self._instance
@property @lazy
def args(self): def args(self):
return self._args return self._args
@property @lazy
def kwargs(self): def kwargs(self):
return self._kwargs return self._kwargs
@ -179,15 +199,6 @@ class Job(object):
job.refresh() job.refresh()
return job return job
@classmethod
def safe_fetch(cls, id, connection=None):
"""Fetches a persisted job from its corresponding Redis key, but does
not instantiate it, making it impossible to get UnpickleErrors.
"""
job = cls(id, connection=connection)
job.refresh(safe=True)
return job
def __init__(self, id=None, connection=None): def __init__(self, id=None, connection=None):
self.connection = resolve_connection(connection) self.connection = resolve_connection(connection)
self._id = id self._id = id
@ -279,7 +290,7 @@ class Job(object):
# Persistence # Persistence
def refresh(self, safe=False): # noqa def refresh(self): # noqa
"""Overwrite the current instance's properties with the values in the """Overwrite the current instance's properties with the values in the
corresponding Redis key. corresponding Redis key.
@ -301,11 +312,6 @@ class Job(object):
except KeyError: except KeyError:
raise NoSuchJobError('Unexpected job format: {0}'.format(obj)) raise NoSuchJobError('Unexpected job format: {0}'.format(obj))
try:
self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data)
except UnpickleError:
if not safe:
raise
self.created_at = to_date(as_text(obj.get('created_at'))) self.created_at = to_date(as_text(obj.get('created_at')))
self.origin = as_text(obj.get('origin')) self.origin = as_text(obj.get('origin'))
self.description = as_text(obj.get('description')) self.description = as_text(obj.get('description'))
@ -324,8 +330,11 @@ class Job(object):
obj = {} obj = {}
obj['created_at'] = utcformat(self.created_at or utcnow()) obj['created_at'] = utcformat(self.created_at or utcnow())
if self.func_name is not None: if self.data is not None:
obj['data'] = self.data
elif self.func_name is not None:
obj['data'] = dumps(self.job_tuple) obj['data'] = dumps(self.job_tuple)
if self.origin is not None: if self.origin is not None:
obj['origin'] = self.origin obj['origin'] = self.origin
if self.description is not None: if self.description is not None:

@ -74,15 +74,11 @@ class Queue(object):
"""Returns whether the current queue is empty.""" """Returns whether the current queue is empty."""
return self.count == 0 return self.count == 0
def safe_fetch_job(self, job_id): def fetch_job(self, job_id):
try: try:
job = Job.safe_fetch(job_id, connection=self.connection) return Job.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
self.remove(job_id) self.remove(job_id)
return None
except UnpickleError:
return None
return job
def get_job_ids(self, offset=0, length=-1): def get_job_ids(self, offset=0, length=-1):
"""Returns a slice of job IDs in the queue.""" """Returns a slice of job IDs in the queue."""
@ -97,7 +93,7 @@ class Queue(object):
def get_jobs(self, offset=0, length=-1): def get_jobs(self, offset=0, length=-1):
"""Returns a slice of jobs in the queue.""" """Returns a slice of jobs in the queue."""
job_ids = self.get_job_ids(offset, length) job_ids = self.get_job_ids(offset, length)
return compact([self.safe_fetch_job(job_id) for job_id in job_ids]) return compact([self.fetch_job(job_id) for job_id in job_ids])
@property @property
def job_ids(self): def job_ids(self):

@ -308,10 +308,6 @@ class Worker(object):
break break
except StopRequested: except StopRequested:
break break
except UnpickleError as e:
job = Job.safe_fetch(e.job_id, connection=self.connection)
self.handle_exception(job, *sys.exc_info())
continue
self.state = 'busy' self.state = 'busy'

@ -170,16 +170,19 @@ class TestJob(RQTestCase):
Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0')
def test_fetching_unreadable_data(self): def test_fetching_unreadable_data(self):
"""Fetching fails on unreadable data.""" """Fetching succeeds on unreadable data, but lazy props fail."""
# Set up # Set up
job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2)) job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save() job.save()
# Just replace the data hkey with some random noise # Just replace the data hkey with some random noise
self.testconn.hset(job.key, 'data', 'this is no pickle string') self.testconn.hset(job.key, 'data', 'this is no pickle string')
with self.assertRaises(UnpickleError):
job.refresh() job.refresh()
for attr in ('func_name', 'instance', 'args', 'kwargs'):
with self.assertRaises(UnpickleError):
getattr(job, attr)
def test_job_is_unimportable(self): def test_job_is_unimportable(self):
"""Jobs that cannot be imported throw exception on access.""" """Jobs that cannot be imported throw exception on access."""
job = Job.create(func=say_hello, args=('Lionel',)) job = Job.create(func=say_hello, args=('Lionel',))

Loading…
Cancel
Save