diff --git a/rq/job.py b/rq/job.py index dc41bad..853f227 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,3 +1,4 @@ +import importlib import times from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError @@ -15,9 +16,6 @@ def unpickle(pickled_string): """ try: obj = loads(pickled_string) - except AttributeError as e: - raise UnpickleError('Could not unpickle: %s' % e.message, - pickled_string) except (StandardError, UnpicklingError): raise UnpickleError('Could not unpickle.', pickled_string) return obj @@ -34,15 +32,28 @@ class Job(object): keyword arguments. """ job = Job() - job._func = func + job._func_name = '%s.%s' % (func.__module__, func.__name__) job._args = args job._kwargs = kwargs job.description = job.get_call_string() return job + @property + def func_name(self): + return self._func_name + @property def func(self): - return self._func + import warnings + warnings.warn('Don\'t use this!', DeprecationWarning) + + func_name = self.func_name + if func_name is None: + return None + + module_name, func_name = func_name.rsplit('.', 1) + module = importlib.import_module(module_name) + return getattr(module, func_name) @property def args(self): @@ -69,7 +80,7 @@ class Job(object): def __init__(self, id=None): self._id = id self.created_at = times.now() - self._func = None + self._func_name = None self._args = None self._kwargs = None self.description = None @@ -111,7 +122,7 @@ class Job(object): def job_tuple(self): """Returns the job tuple that encodes the actual function call that this job represents.""" - return (self.func, self.args, self.kwargs) + return (self.func_name, self.args, self.kwargs) @property def return_value(self): @@ -160,7 +171,7 @@ class Job(object): else: return times.to_universal(date_str) - self._func, self._args, self._kwargs = unpickle(data) + self._func_name, self._args, self._kwargs = unpickle(data) self.created_at = to_date(created_at) self.origin = origin self.description = description @@ -180,7 +191,7 @@ class Job(object): obj = {} obj['created_at'] = times.format(self.created_at, 'UTC') - if self.func is not None: + if self.func_name is not None: obj['data'] = dumps(self.job_tuple) if self.origin is not None: obj['origin'] = self.origin @@ -227,13 +238,13 @@ class Job(object): """Returns a string representation of the call, formatted as a regular Python function invocation statement. """ - if self.func is None: + if self.func_name is None: return None arg_list = [repr(arg) for arg in self.args] arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()] args = ', '.join(arg_list) - return '%s(%s)' % (self.func.__name__, args) + return '%s(%s)' % (self.func_name, args) def __str__(self): return '' % (self.id, self.description) diff --git a/rq/worker.py b/rq/worker.py index 142948a..92da8b0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -339,7 +339,7 @@ class Worker(object): inside the work horse's process. """ self.procline('Processing %s from %s since %s' % ( - job.func.__name__, + job.func_name, job.origin, time.time())) try: diff --git a/setup.py b/setup.py index 5263a04..b1672af 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def get_version(): raise RuntimeError('No version info found.') def get_dependencies(): - deps = ['redis', 'procname', 'times'] + deps = ['redis', 'procname', 'importlib', 'times'] deps += ['logbook'] # should be soft dependency? if sys.version_info < (2, 7) or \ sys.version_info >= (3, 0) and sys.version_info < (3, 2): diff --git a/tests/test_job.py b/tests/test_job.py index ff260d5..7dc2c22 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -58,20 +58,20 @@ class TestJob(RQTestCase): # Saving writes pickled job data unpickled_data = loads(self.testconn.hget(job.key, 'data')) - self.assertEquals(unpickled_data[0], some_calculation) + self.assertEquals(unpickled_data[0], 'tests.fixtures.some_calculation') def test_fetch(self): """Fetching jobs.""" # Prepare test self.testconn.hset('rq:job:some_id', 'data', - "(ctest_job\nsome_calculation\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa + "(S'tests.fixtures.some_calculation'\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa self.testconn.hset('rq:job:some_id', 'created_at', "2012-02-07 22:13:24+0000") # Fetch returns a job job = Job.fetch('some_id') self.assertEquals(job.id, 'some_id') - self.assertEquals(job.func, some_calculation) + self.assertEquals(job.func_name, 'tests.fixtures.some_calculation') self.assertEquals(job.args, (3, 4)) self.assertEquals(job.kwargs, dict(z=2)) self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) @@ -137,16 +137,3 @@ class TestJob(RQTestCase): self.testconn.hset(job.key, 'data', 'this is no pickle string') with self.assertRaises(UnpickleError): job.refresh() - - # Set up (part B) - job = Job.create(some_calculation, 3, 4, z=2) - job.save() - - # Now slightly modify the job to make it unpickl'able (this is - # equivalent to a worker not having the most up-to-date source code and - # unable to import the function) - data = self.testconn.hget(job.key, 'data') - unimportable_data = data.replace('some_calculation', 'broken') - self.testconn.hset(job.key, 'data', unimportable_data) - with self.assertRaises(UnpickleError): - job.refresh() diff --git a/tests/test_worker.py b/tests/test_worker.py index 13f3e8f..3eac3f4 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -54,6 +54,24 @@ class TestWorker(RQTestCase): self.assertEquals(q.count, 0) self.assertEquals(failed_q.count, 1) + def test_work_is_unimportable(self): + """Jobs that cannot be imported are put on the failed queue.""" + q = Queue() + + job = q.enqueue(say_hello, 'Lionel') + job.save() + + # Now slightly modify the job to make it unimportable (this is + # equivalent to a worker not having the most up-to-date source code + # and unable to import the function) + data = self.testconn.hget(job.key, 'data') + unimportable_data = data.replace('say_hello', 'shut_up') + self.testconn.hset(job.key, 'data', unimportable_data) + + job.refresh() + with self.assertRaises((ImportError, AttributeError)): + job.func # accessing the func property should fail + def test_work_fails(self): """Failing jobs are put on the failed queue.""" q = Queue() @@ -128,10 +146,14 @@ class TestWorker(RQTestCase): w = Worker([q]) w.work(burst=True) + # First, assert that the job executed successfully + assert self.testconn.hget(job_with_rv.key, 'exc_info') is None + assert self.testconn.hget(job_without_rv.key, 'exc_info') is None + # Jobs with results expire after a certain TTL, while jobs without # results are immediately removed assert self.testconn.ttl(job_with_rv.key) > 0 - assert self.testconn.exists(job_without_rv.key) == False + assert not self.testconn.exists(job_without_rv.key) @slow # noqa