diff --git a/rq/job.py b/rq/job.py index 29d8d7d..095506b 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,4 +1,5 @@ import importlib +import inspect import times from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError @@ -7,8 +8,8 @@ from .exceptions import UnpickleError, NoSuchJobError JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', - 'created_at', 'enqueued_at', 'connection', '_result', 'timeout', - '_kwargs', 'exc_info', '_id', 'data']) + 'created_at', 'enqueued_at', 'connection', '_result', + 'timeout', '_kwargs', 'exc_info', '_id', 'data']) def unpickle(pickled_string): @@ -55,7 +56,11 @@ class Job(object): """ connection = kwargs.pop('connection', None) job = cls(connection=connection) - job._func_name = '%s.%s' % (func.__module__, func.__name__) + if inspect.ismethod(func): + job._instance = func.im_self + job._func_name = func.__name__ + else: + job._func_name = '%s.%s' % (func.__module__, func.__name__) job._args = args job._kwargs = kwargs job.description = job.get_call_string() @@ -71,10 +76,17 @@ class Job(object): if func_name is None: return None + if self.instance: + return getattr(self.instance, func_name) + module_name, func_name = func_name.rsplit('.', 1) module = importlib.import_module(module_name) return getattr(module, func_name) + @property + def instance(self): + return self._instance + @property def args(self): return self._args @@ -105,6 +117,7 @@ class Job(object): self._id = id self.created_at = times.now() self._func_name = None + self._instance = None self._args = None self._kwargs = None self.description = None @@ -141,12 +154,11 @@ class Job(object): """The Redis key that is used to store job hash under.""" return self.key_for(self.id) - @property # noqa def job_tuple(self): """Returns the job tuple that encodes the actual function call that this job represents.""" - return (self.func_name, self.args, self.kwargs) + return (self.func_name, self.instance, self.args, self.kwargs) @property def return_value(self): @@ -190,7 +202,8 @@ class Job(object): return None else: return times.to_universal(date_str) - self._func_name, self._args, self._kwargs = unpickle(obj.get('data')) + + self._func_name, self._instance, self._args, self._kwargs = unpickle(obj.get('data')) # noqa self.created_at = to_date(obj.get('created_at')) self.origin = obj.get('origin') self.description = obj.get('description') @@ -199,9 +212,8 @@ class Job(object): self._result = obj.get('result') self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None - """ - Overwrite job's additional attributes (those not in JOB_ATTRS), if any. - """ + + # Overwrite job's additional attrs (those not in JOB_ATTRS), if any additional_attrs = set(obj.keys()).difference(JOB_ATTRS) for attr in additional_attrs: setattr(self, attr, obj[attr]) @@ -233,12 +245,12 @@ class Job(object): Store additional attributes from job instance into Redis. This is done so that third party libraries using RQ can store additional data directly on ``Job`` instances. For example: - + job = Job.create(func) job.foo = 'bar' job.save() # Will persist the 'foo' attribute """ - additional_attrs = set(self.__dict__.keys()).difference(JOB_ATTRS) + additional_attrs = set(self.__dict__.keys()).difference(JOB_ATTRS) for attr in additional_attrs: obj[attr] = getattr(self, attr) self.connection.hmset(key, obj) diff --git a/rq/queue.py b/rq/queue.py index 0f07139..a7e5dfe 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -263,7 +263,7 @@ class FailedQueue(Queue): """ job.ended_at = times.now() job.exc_info = exc_info - return self.enqueue_job(job, set_meta_data=False) + return self.enqueue_job(job, timeout=job.timeout, set_meta_data=False) def requeue(self, job_id): """Requeues the job with the given job ID.""" @@ -280,4 +280,4 @@ class FailedQueue(Queue): job.exc_info = None q = Queue(job.origin, connection=self.connection) - q.enqueue_job(job) + q.enqueue_job(job, timeout=job.timeout) diff --git a/tests/fixtures.py b/tests/fixtures.py index 36d0c8a..32003de 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -39,3 +39,12 @@ def create_file(path): def create_file_after_timeout(path, timeout): time.sleep(timeout) create_file(path) + + +class Calculator(object): + """Test instance methods.""" + def __init__(self, denominator): + self.denominator = denominator + + def calculate(x, y): + return x * y / self.denominator diff --git a/tests/test_job.py b/tests/test_job.py index 1715653..1a5916b 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,7 +1,7 @@ import times from datetime import datetime from tests import RQTestCase -from tests.fixtures import some_calculation, say_hello +from tests.fixtures import Calculator, some_calculation, say_hello from tests.helpers import strip_milliseconds from cPickle import loads from rq.job import Job @@ -19,6 +19,7 @@ class TestJob(RQTestCase): # ...and nothing else self.assertIsNone(job.func) + self.assertIsNone(job.instance) self.assertIsNone(job.args) self.assertIsNone(job.kwargs) self.assertIsNone(job.origin) @@ -35,6 +36,7 @@ class TestJob(RQTestCase): self.assertIsNotNone(job.id) self.assertIsNotNone(job.created_at) self.assertIsNotNone(job.description) + self.assertIsNone(job.instance) # Job data is set... self.assertEquals(job.func, some_calculation) @@ -46,6 +48,15 @@ class TestJob(RQTestCase): self.assertIsNone(job.enqueued_at) self.assertIsNone(job.return_value) + def test_create_instance_method_job(self): + """Creation of jobs for instance methods.""" + c = Calculator(2) + job = Job.create(c.calculate, 3, 4) + + # Job data is set + self.assertEquals(job.func, c.calculate) + self.assertEquals(job.instance, c) + self.assertEquals(job.args, (3, 4)) def test_save(self): # noqa """Storing jobs.""" @@ -64,7 +75,7 @@ class TestJob(RQTestCase): """Fetching jobs.""" # Prepare test self.testconn.hset('rq:job:some_id', 'data', - "(S'tests.fixtures.some_calculation'\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa + "(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n.") # noqa self.testconn.hset('rq:job:some_id', 'created_at', "2012-02-07 22:13:24+0000") @@ -72,6 +83,7 @@ class TestJob(RQTestCase): job = Job.fetch('some_id') self.assertEquals(job.id, 'some_id') self.assertEquals(job.func_name, 'tests.fixtures.some_calculation') + self.assertIsNone(job.instance) self.assertEquals(job.args, (3, 4)) self.assertEquals(job.kwargs, dict(z=2)) self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) @@ -167,4 +179,4 @@ class TestJob(RQTestCase): job2 = Job.fetch(job.id) job2.refresh() - self.assertEqual(job2.foo, 'bar') \ No newline at end of file + self.assertEqual(job2.foo, 'bar') diff --git a/tests/test_queue.py b/tests/test_queue.py index a75abf5..fb6b1fc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,5 +1,5 @@ from tests import RQTestCase -from tests.fixtures import say_hello, div_by_zero +from tests.fixtures import Calculator, say_hello, div_by_zero from rq import Queue, get_failed_queue from rq.job import Job from rq.exceptions import InvalidJobOperationError @@ -132,6 +132,19 @@ class TestQueue(RQTestCase): # ...and assert the queue count when down self.assertEquals(q.count, 0) + def test_dequeue_instance_method(self): + """Dequeueing instance method jobs from queues.""" + q = Queue() + c = Calculator(2) + result = q.enqueue(c.calculate, 3, 4) + + job = q.dequeue() + # The instance has been pickled and unpickled, so it is now a separate + # object. Test for equality using each object's __dict__ instead. + self.assertEquals(job.instance.__dict__, c.__dict__) + self.assertEquals(job.func.__name__, 'calculate') + self.assertEquals(job.args, (3, 4)) + def test_dequeue_ignores_nonexisting_jobs(self): """Dequeuing silently ignores non-existing jobs.""" @@ -217,3 +230,25 @@ class TestFailedQueue(RQTestCase): # Assert that we cannot requeue a job that's not on the failed queue with self.assertRaises(InvalidJobOperationError): get_failed_queue().requeue(job.id) + + def test_quarantine_preserves_timeout(self): + """Quarantine preserves job timeout.""" + job = Job.create(div_by_zero, 1, 2, 3) + job.origin = 'fake' + job.timeout = 200 + job.save() + get_failed_queue().quarantine(job, Exception('Some fake error')) + + self.assertEquals(job.timeout, 200) + + def test_requeueing_preserves_timeout(self): + """Requeueing preserves job timeout.""" + job = Job.create(div_by_zero, 1, 2, 3) + job.origin = 'fake' + job.timeout = 200 + job.save() + get_failed_queue().quarantine(job, Exception('Some fake error')) + get_failed_queue().requeue(job.id) + + job = Job.fetch(job.id) + self.assertEquals(job.timeout, 200)