Merge with master.

main
Vincent Driessen 13 years ago
commit 604fce99a1

@ -1,4 +1,5 @@
import importlib import importlib
import inspect
import times import times
from uuid import uuid4 from uuid import uuid4
from cPickle import loads, dumps, UnpicklingError from cPickle import loads, dumps, UnpicklingError
@ -7,8 +8,8 @@ from .exceptions import UnpickleError, NoSuchJobError
JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args',
'created_at', 'enqueued_at', 'connection', '_result', 'timeout', 'created_at', 'enqueued_at', 'connection', '_result',
'_kwargs', 'exc_info', '_id', 'data']) 'timeout', '_kwargs', 'exc_info', '_id', 'data'])
def unpickle(pickled_string): def unpickle(pickled_string):
@ -55,7 +56,11 @@ class Job(object):
""" """
connection = kwargs.pop('connection', None) connection = kwargs.pop('connection', None)
job = cls(connection=connection) job = cls(connection=connection)
job._func_name = '%s.%s' % (func.__module__, func.__name__) if inspect.ismethod(func):
job._instance = func.im_self
job._func_name = func.__name__
else:
job._func_name = '%s.%s' % (func.__module__, func.__name__)
job._args = args job._args = args
job._kwargs = kwargs job._kwargs = kwargs
job.description = job.get_call_string() job.description = job.get_call_string()
@ -71,10 +76,17 @@ class Job(object):
if func_name is None: if func_name is None:
return None return None
if self.instance:
return getattr(self.instance, func_name)
module_name, func_name = func_name.rsplit('.', 1) module_name, func_name = func_name.rsplit('.', 1)
module = importlib.import_module(module_name) module = importlib.import_module(module_name)
return getattr(module, func_name) return getattr(module, func_name)
@property
def instance(self):
return self._instance
@property @property
def args(self): def args(self):
return self._args return self._args
@ -105,6 +117,7 @@ class Job(object):
self._id = id self._id = id
self.created_at = times.now() self.created_at = times.now()
self._func_name = None self._func_name = None
self._instance = None
self._args = None self._args = None
self._kwargs = None self._kwargs = None
self.description = None self.description = None
@ -141,12 +154,11 @@ class Job(object):
"""The Redis key that is used to store job hash under.""" """The Redis key that is used to store job hash under."""
return self.key_for(self.id) return self.key_for(self.id)
@property # noqa @property # noqa
def job_tuple(self): def job_tuple(self):
"""Returns the job tuple that encodes the actual function call that """Returns the job tuple that encodes the actual function call that
this job represents.""" this job represents."""
return (self.func_name, self.args, self.kwargs) return (self.func_name, self.instance, self.args, self.kwargs)
@property @property
def return_value(self): def return_value(self):
@ -190,7 +202,8 @@ class Job(object):
return None return None
else: else:
return times.to_universal(date_str) return times.to_universal(date_str)
self._func_name, self._args, self._kwargs = unpickle(obj.get('data'))
self._func_name, self._instance, self._args, self._kwargs = unpickle(obj.get('data')) # noqa
self.created_at = to_date(obj.get('created_at')) self.created_at = to_date(obj.get('created_at'))
self.origin = obj.get('origin') self.origin = obj.get('origin')
self.description = obj.get('description') self.description = obj.get('description')
@ -199,9 +212,8 @@ class Job(object):
self._result = obj.get('result') self._result = obj.get('result')
self.exc_info = obj.get('exc_info') self.exc_info = obj.get('exc_info')
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
"""
Overwrite job's additional attributes (those not in JOB_ATTRS), if any. # Overwrite job's additional attrs (those not in JOB_ATTRS), if any
"""
additional_attrs = set(obj.keys()).difference(JOB_ATTRS) additional_attrs = set(obj.keys()).difference(JOB_ATTRS)
for attr in additional_attrs: for attr in additional_attrs:
setattr(self, attr, obj[attr]) setattr(self, attr, obj[attr])
@ -233,12 +245,12 @@ class Job(object):
Store additional attributes from job instance into Redis. This is done Store additional attributes from job instance into Redis. This is done
so that third party libraries using RQ can store additional data so that third party libraries using RQ can store additional data
directly on ``Job`` instances. For example: directly on ``Job`` instances. For example:
job = Job.create(func) job = Job.create(func)
job.foo = 'bar' job.foo = 'bar'
job.save() # Will persist the 'foo' attribute job.save() # Will persist the 'foo' attribute
""" """
additional_attrs = set(self.__dict__.keys()).difference(JOB_ATTRS) additional_attrs = set(self.__dict__.keys()).difference(JOB_ATTRS)
for attr in additional_attrs: for attr in additional_attrs:
obj[attr] = getattr(self, attr) obj[attr] = getattr(self, attr)
self.connection.hmset(key, obj) self.connection.hmset(key, obj)

@ -263,7 +263,7 @@ class FailedQueue(Queue):
""" """
job.ended_at = times.now() job.ended_at = times.now()
job.exc_info = exc_info job.exc_info = exc_info
return self.enqueue_job(job, set_meta_data=False) return self.enqueue_job(job, timeout=job.timeout, set_meta_data=False)
def requeue(self, job_id): def requeue(self, job_id):
"""Requeues the job with the given job ID.""" """Requeues the job with the given job ID."""
@ -280,4 +280,4 @@ class FailedQueue(Queue):
job.exc_info = None job.exc_info = None
q = Queue(job.origin, connection=self.connection) q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job) q.enqueue_job(job, timeout=job.timeout)

@ -39,3 +39,12 @@ def create_file(path):
def create_file_after_timeout(path, timeout): def create_file_after_timeout(path, timeout):
time.sleep(timeout) time.sleep(timeout)
create_file(path) create_file(path)
class Calculator(object):
"""Test instance methods."""
def __init__(self, denominator):
self.denominator = denominator
def calculate(x, y):
return x * y / self.denominator

@ -1,7 +1,7 @@
import times import times
from datetime import datetime from datetime import datetime
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import some_calculation, say_hello from tests.fixtures import Calculator, some_calculation, say_hello
from tests.helpers import strip_milliseconds from tests.helpers import strip_milliseconds
from cPickle import loads from cPickle import loads
from rq.job import Job from rq.job import Job
@ -19,6 +19,7 @@ class TestJob(RQTestCase):
# ...and nothing else # ...and nothing else
self.assertIsNone(job.func) self.assertIsNone(job.func)
self.assertIsNone(job.instance)
self.assertIsNone(job.args) self.assertIsNone(job.args)
self.assertIsNone(job.kwargs) self.assertIsNone(job.kwargs)
self.assertIsNone(job.origin) self.assertIsNone(job.origin)
@ -35,6 +36,7 @@ class TestJob(RQTestCase):
self.assertIsNotNone(job.id) self.assertIsNotNone(job.id)
self.assertIsNotNone(job.created_at) self.assertIsNotNone(job.created_at)
self.assertIsNotNone(job.description) self.assertIsNotNone(job.description)
self.assertIsNone(job.instance)
# Job data is set... # Job data is set...
self.assertEquals(job.func, some_calculation) self.assertEquals(job.func, some_calculation)
@ -46,6 +48,15 @@ class TestJob(RQTestCase):
self.assertIsNone(job.enqueued_at) self.assertIsNone(job.enqueued_at)
self.assertIsNone(job.return_value) self.assertIsNone(job.return_value)
def test_create_instance_method_job(self):
"""Creation of jobs for instance methods."""
c = Calculator(2)
job = Job.create(c.calculate, 3, 4)
# Job data is set
self.assertEquals(job.func, c.calculate)
self.assertEquals(job.instance, c)
self.assertEquals(job.args, (3, 4))
def test_save(self): # noqa def test_save(self): # noqa
"""Storing jobs.""" """Storing jobs."""
@ -64,7 +75,7 @@ class TestJob(RQTestCase):
"""Fetching jobs.""" """Fetching jobs."""
# Prepare test # Prepare test
self.testconn.hset('rq:job:some_id', 'data', self.testconn.hset('rq:job:some_id', 'data',
"(S'tests.fixtures.some_calculation'\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa "(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n.") # noqa
self.testconn.hset('rq:job:some_id', 'created_at', self.testconn.hset('rq:job:some_id', 'created_at',
"2012-02-07 22:13:24+0000") "2012-02-07 22:13:24+0000")
@ -72,6 +83,7 @@ class TestJob(RQTestCase):
job = Job.fetch('some_id') job = Job.fetch('some_id')
self.assertEquals(job.id, 'some_id') self.assertEquals(job.id, 'some_id')
self.assertEquals(job.func_name, 'tests.fixtures.some_calculation') self.assertEquals(job.func_name, 'tests.fixtures.some_calculation')
self.assertIsNone(job.instance)
self.assertEquals(job.args, (3, 4)) self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, dict(z=2)) self.assertEquals(job.kwargs, dict(z=2))
self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24))
@ -167,4 +179,4 @@ class TestJob(RQTestCase):
job2 = Job.fetch(job.id) job2 = Job.fetch(job.id)
job2.refresh() job2.refresh()
self.assertEqual(job2.foo, 'bar') self.assertEqual(job2.foo, 'bar')

@ -1,5 +1,5 @@
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import say_hello, div_by_zero from tests.fixtures import Calculator, say_hello, div_by_zero
from rq import Queue, get_failed_queue from rq import Queue, get_failed_queue
from rq.job import Job from rq.job import Job
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
@ -132,6 +132,19 @@ class TestQueue(RQTestCase):
# ...and assert the queue count when down # ...and assert the queue count when down
self.assertEquals(q.count, 0) self.assertEquals(q.count, 0)
def test_dequeue_instance_method(self):
"""Dequeueing instance method jobs from queues."""
q = Queue()
c = Calculator(2)
result = q.enqueue(c.calculate, 3, 4)
job = q.dequeue()
# The instance has been pickled and unpickled, so it is now a separate
# object. Test for equality using each object's __dict__ instead.
self.assertEquals(job.instance.__dict__, c.__dict__)
self.assertEquals(job.func.__name__, 'calculate')
self.assertEquals(job.args, (3, 4))
def test_dequeue_ignores_nonexisting_jobs(self): def test_dequeue_ignores_nonexisting_jobs(self):
"""Dequeuing silently ignores non-existing jobs.""" """Dequeuing silently ignores non-existing jobs."""
@ -217,3 +230,25 @@ class TestFailedQueue(RQTestCase):
# Assert that we cannot requeue a job that's not on the failed queue # Assert that we cannot requeue a job that's not on the failed queue
with self.assertRaises(InvalidJobOperationError): with self.assertRaises(InvalidJobOperationError):
get_failed_queue().requeue(job.id) get_failed_queue().requeue(job.id)
def test_quarantine_preserves_timeout(self):
"""Quarantine preserves job timeout."""
job = Job.create(div_by_zero, 1, 2, 3)
job.origin = 'fake'
job.timeout = 200
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error'))
self.assertEquals(job.timeout, 200)
def test_requeueing_preserves_timeout(self):
"""Requeueing preserves job timeout."""
job = Job.create(div_by_zero, 1, 2, 3)
job.origin = 'fake'
job.timeout = 200
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error'))
get_failed_queue().requeue(job.id)
job = Job.fetch(job.id)
self.assertEquals(job.timeout, 200)

Loading…
Cancel
Save