From 15342f14d3e5ee853a6fc1ba9abd9a6808c1817f Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 21 Mar 2012 13:08:26 +0100 Subject: [PATCH] Store pickled function calls as strings. This aids unpacking in the case of a function that isn't importable from the worker's runtime. The unpickling will now (almost) always succeed, and throw an ImportError later on, when the function is actually accessed (thus imported implicitly). The end result is a job on the failed queue, with exc_info describing the import error, which is tremendously useful. --- rq/job.py | 33 ++++++++++++++++++++++----------- rq/worker.py | 2 +- setup.py | 2 +- tests/test_job.py | 19 +++---------------- tests/test_worker.py | 24 +++++++++++++++++++++++- 5 files changed, 50 insertions(+), 30 deletions(-) diff --git a/rq/job.py b/rq/job.py index dc41bad..853f227 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,3 +1,4 @@ +import importlib import times from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError @@ -15,9 +16,6 @@ def unpickle(pickled_string): """ try: obj = loads(pickled_string) - except AttributeError as e: - raise UnpickleError('Could not unpickle: %s' % e.message, - pickled_string) except (StandardError, UnpicklingError): raise UnpickleError('Could not unpickle.', pickled_string) return obj @@ -34,15 +32,28 @@ class Job(object): keyword arguments. """ job = Job() - job._func = func + job._func_name = '%s.%s' % (func.__module__, func.__name__) job._args = args job._kwargs = kwargs job.description = job.get_call_string() return job + @property + def func_name(self): + return self._func_name + @property def func(self): - return self._func + import warnings + warnings.warn('Don\'t use this!', DeprecationWarning) + + func_name = self.func_name + if func_name is None: + return None + + module_name, func_name = func_name.rsplit('.', 1) + module = importlib.import_module(module_name) + return getattr(module, func_name) @property def args(self): @@ -69,7 +80,7 @@ class Job(object): def __init__(self, id=None): self._id = id self.created_at = times.now() - self._func = None + self._func_name = None self._args = None self._kwargs = None self.description = None @@ -111,7 +122,7 @@ class Job(object): def job_tuple(self): """Returns the job tuple that encodes the actual function call that this job represents.""" - return (self.func, self.args, self.kwargs) + return (self.func_name, self.args, self.kwargs) @property def return_value(self): @@ -160,7 +171,7 @@ class Job(object): else: return times.to_universal(date_str) - self._func, self._args, self._kwargs = unpickle(data) + self._func_name, self._args, self._kwargs = unpickle(data) self.created_at = to_date(created_at) self.origin = origin self.description = description @@ -180,7 +191,7 @@ class Job(object): obj = {} obj['created_at'] = times.format(self.created_at, 'UTC') - if self.func is not None: + if self.func_name is not None: obj['data'] = dumps(self.job_tuple) if self.origin is not None: obj['origin'] = self.origin @@ -227,13 +238,13 @@ class Job(object): """Returns a string representation of the call, formatted as a regular Python function invocation statement. """ - if self.func is None: + if self.func_name is None: return None arg_list = [repr(arg) for arg in self.args] arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()] args = ', '.join(arg_list) - return '%s(%s)' % (self.func.__name__, args) + return '%s(%s)' % (self.func_name, args) def __str__(self): return '' % (self.id, self.description) diff --git a/rq/worker.py b/rq/worker.py index 142948a..92da8b0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -339,7 +339,7 @@ class Worker(object): inside the work horse's process. """ self.procline('Processing %s from %s since %s' % ( - job.func.__name__, + job.func_name, job.origin, time.time())) try: diff --git a/setup.py b/setup.py index 5263a04..b1672af 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def get_version(): raise RuntimeError('No version info found.') def get_dependencies(): - deps = ['redis', 'procname', 'times'] + deps = ['redis', 'procname', 'importlib', 'times'] deps += ['logbook'] # should be soft dependency? if sys.version_info < (2, 7) or \ sys.version_info >= (3, 0) and sys.version_info < (3, 2): diff --git a/tests/test_job.py b/tests/test_job.py index ff260d5..7dc2c22 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -58,20 +58,20 @@ class TestJob(RQTestCase): # Saving writes pickled job data unpickled_data = loads(self.testconn.hget(job.key, 'data')) - self.assertEquals(unpickled_data[0], some_calculation) + self.assertEquals(unpickled_data[0], 'tests.fixtures.some_calculation') def test_fetch(self): """Fetching jobs.""" # Prepare test self.testconn.hset('rq:job:some_id', 'data', - "(ctest_job\nsome_calculation\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa + "(S'tests.fixtures.some_calculation'\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa self.testconn.hset('rq:job:some_id', 'created_at', "2012-02-07 22:13:24+0000") # Fetch returns a job job = Job.fetch('some_id') self.assertEquals(job.id, 'some_id') - self.assertEquals(job.func, some_calculation) + self.assertEquals(job.func_name, 'tests.fixtures.some_calculation') self.assertEquals(job.args, (3, 4)) self.assertEquals(job.kwargs, dict(z=2)) self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) @@ -137,16 +137,3 @@ class TestJob(RQTestCase): self.testconn.hset(job.key, 'data', 'this is no pickle string') with self.assertRaises(UnpickleError): job.refresh() - - # Set up (part B) - job = Job.create(some_calculation, 3, 4, z=2) - job.save() - - # Now slightly modify the job to make it unpickl'able (this is - # equivalent to a worker not having the most up-to-date source code and - # unable to import the function) - data = self.testconn.hget(job.key, 'data') - unimportable_data = data.replace('some_calculation', 'broken') - self.testconn.hset(job.key, 'data', unimportable_data) - with self.assertRaises(UnpickleError): - job.refresh() diff --git a/tests/test_worker.py b/tests/test_worker.py index 13f3e8f..3eac3f4 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -54,6 +54,24 @@ class TestWorker(RQTestCase): self.assertEquals(q.count, 0) self.assertEquals(failed_q.count, 1) + def test_work_is_unimportable(self): + """Jobs that cannot be imported are put on the failed queue.""" + q = Queue() + + job = q.enqueue(say_hello, 'Lionel') + job.save() + + # Now slightly modify the job to make it unimportable (this is + # equivalent to a worker not having the most up-to-date source code + # and unable to import the function) + data = self.testconn.hget(job.key, 'data') + unimportable_data = data.replace('say_hello', 'shut_up') + self.testconn.hset(job.key, 'data', unimportable_data) + + job.refresh() + with self.assertRaises((ImportError, AttributeError)): + job.func # accessing the func property should fail + def test_work_fails(self): """Failing jobs are put on the failed queue.""" q = Queue() @@ -128,10 +146,14 @@ class TestWorker(RQTestCase): w = Worker([q]) w.work(burst=True) + # First, assert that the job executed successfully + assert self.testconn.hget(job_with_rv.key, 'exc_info') is None + assert self.testconn.hget(job_without_rv.key, 'exc_info') is None + # Jobs with results expire after a certain TTL, while jobs without # results are immediately removed assert self.testconn.ttl(job_with_rv.key) > 0 - assert self.testconn.exists(job_without_rv.key) == False + assert not self.testconn.exists(job_without_rv.key) @slow # noqa