Store pickled function calls as strings.

This aids unpacking in the case of a function that isn't importable from
the worker's runtime. The unpickling will now (almost) always succeed,
and throw an ImportError later on, when the function is actually
accessed (thus imported implicitly).

The end result is a job on the failed queue, with exc_info describing
the import error, which is tremendously useful.
main
Vincent Driessen 13 years ago
parent 14ecb8e956
commit 15342f14d3

@ -1,3 +1,4 @@
import importlib
import times import times
from uuid import uuid4 from uuid import uuid4
from cPickle import loads, dumps, UnpicklingError from cPickle import loads, dumps, UnpicklingError
@ -15,9 +16,6 @@ def unpickle(pickled_string):
""" """
try: try:
obj = loads(pickled_string) obj = loads(pickled_string)
except AttributeError as e:
raise UnpickleError('Could not unpickle: %s' % e.message,
pickled_string)
except (StandardError, UnpicklingError): except (StandardError, UnpicklingError):
raise UnpickleError('Could not unpickle.', pickled_string) raise UnpickleError('Could not unpickle.', pickled_string)
return obj return obj
@ -34,15 +32,28 @@ class Job(object):
keyword arguments. keyword arguments.
""" """
job = Job() job = Job()
job._func = func job._func_name = '%s.%s' % (func.__module__, func.__name__)
job._args = args job._args = args
job._kwargs = kwargs job._kwargs = kwargs
job.description = job.get_call_string() job.description = job.get_call_string()
return job return job
@property
def func_name(self):
return self._func_name
@property @property
def func(self): def func(self):
return self._func import warnings
warnings.warn('Don\'t use this!', DeprecationWarning)
func_name = self.func_name
if func_name is None:
return None
module_name, func_name = func_name.rsplit('.', 1)
module = importlib.import_module(module_name)
return getattr(module, func_name)
@property @property
def args(self): def args(self):
@ -69,7 +80,7 @@ class Job(object):
def __init__(self, id=None): def __init__(self, id=None):
self._id = id self._id = id
self.created_at = times.now() self.created_at = times.now()
self._func = None self._func_name = None
self._args = None self._args = None
self._kwargs = None self._kwargs = None
self.description = None self.description = None
@ -111,7 +122,7 @@ class Job(object):
def job_tuple(self): def job_tuple(self):
"""Returns the job tuple that encodes the actual function call that """Returns the job tuple that encodes the actual function call that
this job represents.""" this job represents."""
return (self.func, self.args, self.kwargs) return (self.func_name, self.args, self.kwargs)
@property @property
def return_value(self): def return_value(self):
@ -160,7 +171,7 @@ class Job(object):
else: else:
return times.to_universal(date_str) return times.to_universal(date_str)
self._func, self._args, self._kwargs = unpickle(data) self._func_name, self._args, self._kwargs = unpickle(data)
self.created_at = to_date(created_at) self.created_at = to_date(created_at)
self.origin = origin self.origin = origin
self.description = description self.description = description
@ -180,7 +191,7 @@ class Job(object):
obj = {} obj = {}
obj['created_at'] = times.format(self.created_at, 'UTC') obj['created_at'] = times.format(self.created_at, 'UTC')
if self.func is not None: if self.func_name is not None:
obj['data'] = dumps(self.job_tuple) obj['data'] = dumps(self.job_tuple)
if self.origin is not None: if self.origin is not None:
obj['origin'] = self.origin obj['origin'] = self.origin
@ -227,13 +238,13 @@ class Job(object):
"""Returns a string representation of the call, formatted as a regular """Returns a string representation of the call, formatted as a regular
Python function invocation statement. Python function invocation statement.
""" """
if self.func is None: if self.func_name is None:
return None return None
arg_list = [repr(arg) for arg in self.args] arg_list = [repr(arg) for arg in self.args]
arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()] arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()]
args = ', '.join(arg_list) args = ', '.join(arg_list)
return '%s(%s)' % (self.func.__name__, args) return '%s(%s)' % (self.func_name, args)
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.description) return '<Job %s: %s>' % (self.id, self.description)

@ -339,7 +339,7 @@ class Worker(object):
inside the work horse's process. inside the work horse's process.
""" """
self.procline('Processing %s from %s since %s' % ( self.procline('Processing %s from %s since %s' % (
job.func.__name__, job.func_name,
job.origin, time.time())) job.origin, time.time()))
try: try:

@ -15,7 +15,7 @@ def get_version():
raise RuntimeError('No version info found.') raise RuntimeError('No version info found.')
def get_dependencies(): def get_dependencies():
deps = ['redis', 'procname', 'times'] deps = ['redis', 'procname', 'importlib', 'times']
deps += ['logbook'] # should be soft dependency? deps += ['logbook'] # should be soft dependency?
if sys.version_info < (2, 7) or \ if sys.version_info < (2, 7) or \
sys.version_info >= (3, 0) and sys.version_info < (3, 2): sys.version_info >= (3, 0) and sys.version_info < (3, 2):

@ -58,20 +58,20 @@ class TestJob(RQTestCase):
# Saving writes pickled job data # Saving writes pickled job data
unpickled_data = loads(self.testconn.hget(job.key, 'data')) unpickled_data = loads(self.testconn.hget(job.key, 'data'))
self.assertEquals(unpickled_data[0], some_calculation) self.assertEquals(unpickled_data[0], 'tests.fixtures.some_calculation')
def test_fetch(self): def test_fetch(self):
"""Fetching jobs.""" """Fetching jobs."""
# Prepare test # Prepare test
self.testconn.hset('rq:job:some_id', 'data', self.testconn.hset('rq:job:some_id', 'data',
"(ctest_job\nsome_calculation\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa "(S'tests.fixtures.some_calculation'\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa
self.testconn.hset('rq:job:some_id', 'created_at', self.testconn.hset('rq:job:some_id', 'created_at',
"2012-02-07 22:13:24+0000") "2012-02-07 22:13:24+0000")
# Fetch returns a job # Fetch returns a job
job = Job.fetch('some_id') job = Job.fetch('some_id')
self.assertEquals(job.id, 'some_id') self.assertEquals(job.id, 'some_id')
self.assertEquals(job.func, some_calculation) self.assertEquals(job.func_name, 'tests.fixtures.some_calculation')
self.assertEquals(job.args, (3, 4)) self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, dict(z=2)) self.assertEquals(job.kwargs, dict(z=2))
self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24))
@ -137,16 +137,3 @@ class TestJob(RQTestCase):
self.testconn.hset(job.key, 'data', 'this is no pickle string') self.testconn.hset(job.key, 'data', 'this is no pickle string')
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
job.refresh() job.refresh()
# Set up (part B)
job = Job.create(some_calculation, 3, 4, z=2)
job.save()
# Now slightly modify the job to make it unpickl'able (this is
# equivalent to a worker not having the most up-to-date source code and
# unable to import the function)
data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace('some_calculation', 'broken')
self.testconn.hset(job.key, 'data', unimportable_data)
with self.assertRaises(UnpickleError):
job.refresh()

@ -54,6 +54,24 @@ class TestWorker(RQTestCase):
self.assertEquals(q.count, 0) self.assertEquals(q.count, 0)
self.assertEquals(failed_q.count, 1) self.assertEquals(failed_q.count, 1)
def test_work_is_unimportable(self):
"""Jobs that cannot be imported are put on the failed queue."""
q = Queue()
job = q.enqueue(say_hello, 'Lionel')
job.save()
# Now slightly modify the job to make it unimportable (this is
# equivalent to a worker not having the most up-to-date source code
# and unable to import the function)
data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace('say_hello', 'shut_up')
self.testconn.hset(job.key, 'data', unimportable_data)
job.refresh()
with self.assertRaises((ImportError, AttributeError)):
job.func # accessing the func property should fail
def test_work_fails(self): def test_work_fails(self):
"""Failing jobs are put on the failed queue.""" """Failing jobs are put on the failed queue."""
q = Queue() q = Queue()
@ -128,10 +146,14 @@ class TestWorker(RQTestCase):
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)
# First, assert that the job executed successfully
assert self.testconn.hget(job_with_rv.key, 'exc_info') is None
assert self.testconn.hget(job_without_rv.key, 'exc_info') is None
# Jobs with results expire after a certain TTL, while jobs without # Jobs with results expire after a certain TTL, while jobs without
# results are immediately removed # results are immediately removed
assert self.testconn.ttl(job_with_rv.key) > 0 assert self.testconn.ttl(job_with_rv.key) > 0
assert self.testconn.exists(job_without_rv.key) == False assert not self.testconn.exists(job_without_rv.key)
@slow # noqa @slow # noqa

Loading…
Cancel
Save