CHECKPOINT: Initial part of the big refactor.

main
Vincent Driessen 13 years ago
parent 7fff52d99c
commit 65105b44c3

@ -1,3 +1,6 @@
class NoSuchJobError(Exception):
pass
class NoQueueError(Exception): class NoQueueError(Exception):
pass pass

@ -1,51 +1,131 @@
from datetime import datetime import times
from uuid import uuid4 from uuid import uuid4
from pickle import loads, dumps from pickle import loads, dumps
from .exceptions import UnpickleError from .proxy import conn
from .exceptions import UnpickleError, NoSuchJobError
class Job(object): class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data. """A Job is just a convenient datastructure to pass around job (meta) data.
""" """
# Job construction
@classmethod @classmethod
def unpickle(cls, pickle_data): def for_call(cls, func, *args, **kwargs):
"""Constructs a Job instance form the given pickle'd job tuple data.""" """Creates a new Job instance for the given function, arguments, and
try: keyword arguments.
unpickled_obj = loads(pickle_data) """
assert isinstance(unpickled_obj, Job) job = Job()
return unpickled_obj job.func = func
except (AssertionError, AttributeError, IndexError, TypeError, KeyError): job.args = args
raise UnpickleError('Could not unpickle Job.', pickle_data) job.kwargs = kwargs
return job
def __init__(self, func, *args, **kwargs): @classmethod
self._id = unicode(uuid4()) def fetch(cls, id):
self.func = func """Fetches a persisted job from its corresponding Redis key and
self.args = args instantiates it.
self.kwargs = kwargs """
job = Job(id)
job.refresh()
return job
def __init__(self, id=None):
self._id = id
self.func = None
self.args = None
self.kwargs = None
self.origin = None self.origin = None
self.created_at = datetime.utcnow() self.created_at = times.now()
self.enqueued_at = None self.enqueued_at = None
self.result = None
self.exc_info = None self.exc_info = None
def pickle(self):
"""Returns the pickle'd string represenation of a Job. Suitable for writing to Redis.""" # Data access
return dumps(self) def get_id(self):
"""The job ID for this job instance. Generates an ID lazily the
first time the ID is requested.
"""
if self._id is None:
self._id = unicode(uuid4())
return self._id
def set_id(self, value):
"""Sets a job ID for the given job."""
self._id = value
id = property(get_id, set_id)
@property @property
def rv_key(self): def key(self):
"""Returns the Redis key under which the Job's result will be stored, if applicable.""" """The Redis key that is used to store job data under."""
return 'rq:result:%s' % (self._id,) return 'rq:job:%s' % (self.id,)
@property @property
def id(self): def job_tuple(self):
"""Returns the Job's internal ID.""" """Returns the job tuple that encodes the actual function call that this job represents."""
return self._id return (self.func, self.args, self.kwargs)
@property
def return_value(self):
"""Returns the return value of the job.
Initially, right after enqueueing a job, the return value will be None.
But when the job has been executed, and had a return value or exception,
this will return that value or exception.
Note that, when the job has no return value (i.e. returns None), the
ReadOnlyJob object is useless, as the result won't be written back to
Redis.
Also note that you cannot draw the conclusion that a job has _not_ been
executed when its return value is None, since return values written back
to Redis will expire after a given amount of time (500 seconds by
default).
"""
if self._cached_result is None:
rv = conn.hget(self.key, 'result')
if rv is not None:
# cache the result
self._cached_result = loads(rv)
return self._cached_result
# Persistence
def refresh(self):
"""Overwrite the current instance's properties with the values in the
corresponding Redis key.
Will raise a NoSuchJobError if no corresponding Redis key exists.
"""
key = self.key
pickled_data = conn.hget(key, 'data')
if pickled_data is None:
raise NoSuchJobError('No such job: %s' % (key,))
self.func, self.args, self.kwargs = loads(pickled_data)
self.created_at = times.to_universal(conn.hget(key, 'created_at'))
def save(self):
"""Persists the current job instance to its corresponding Redis key."""
pickled_data = dumps(self.job_tuple)
key = self.key
conn.hset(key, 'data', pickled_data)
conn.hset(key, 'created_at', times.format(self.created_at, 'UTC'))
# Job execution
def perform(self): def perform(self):
"""Invokes the job function with the job arguments. """Invokes the job function with the job arguments.
""" """
return self.func(*self.args, **self.kwargs) return self.func(*self.args, **self.kwargs)
# Representation
@property @property
def call_string(self): def call_string(self):
"""Returns a string representation of the call, formatted as a regular """Returns a string representation of the call, formatted as a regular
@ -59,5 +139,31 @@ class Job(object):
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.call_string) return '<Job %s: %s>' % (self.id, self.call_string)
# Job equality
def __eq__(self, other): def __eq__(self, other):
return cmp(self.id, other.id) return self.id == other.id
def __hash__(self):
return hash(self.id)
# TODO: TO REFACTOR / REMOVE
def pickle(self):
"""Returns the pickle'd string represenation of a Job. Suitable for
writing to Redis.
"""
return dumps(self)
@classmethod
def unpickle(cls, pickle_data):
"""Constructs a Job instance form the given pickle'd job tuple data."""
try:
unpickled_obj = loads(pickle_data)
assert isinstance(unpickled_obj, Job)
return unpickled_obj
except (AssertionError, AttributeError, IndexError, TypeError, KeyError):
raise UnpickleError('Could not unpickle Job.', pickle_data)

@ -1,44 +1,10 @@
from datetime import datetime from datetime import datetime
from functools import total_ordering from functools import total_ordering
from pickle import loads
from .proxy import conn from .proxy import conn
from .job import Job from .job import Job
from .exceptions import UnpickleError from .exceptions import UnpickleError
class DelayedResult(object):
"""Proxy object that is returned as a result of `Queue.enqueue()` calls.
Instances of DelayedResult can be polled for their return values.
"""
def __init__(self, key):
self.key = key
self._rv = None
@property
def return_value(self):
"""Returns the return value of the job.
Initially, right after enqueueing a job, the return value will be None.
But when the job has been executed, and had a return value or exception,
this will return that value or exception.
Note that, when the job has no return value (i.e. returns None), the
DelayedResult object is useless, as the result won't be written back to
Redis.
Also note that you cannot draw the conclusion that a job has _not_ been
executed when its return value is None, since return values written back
to Redis will expire after a given amount of time (500 seconds by
default).
"""
if self._rv is None:
rv = conn.get(self.key)
if rv is not None:
# cache the result
self._rv = loads(rv)
return self._rv
@total_ordering @total_ordering
class Queue(object): class Queue(object):
redis_queue_namespace_prefix = 'rq:queue:' redis_queue_namespace_prefix = 'rq:queue:'
@ -99,13 +65,14 @@ class Queue(object):
if f.__module__ == '__main__': if f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed by workers.') raise ValueError('Functions from the __main__ module cannot be processed by workers.')
job = Job(f, *args, **kwargs) job = Job.for_call(f, *args, **kwargs)
job.origin = self.name job.origin = self.name
return job return job
def _push(self, pickled_job): def enqueue_job(self, job):
"""Enqueues a pickled_job on the corresponding Redis queue.""" """Enqueues a pickled_job on the corresponding Redis queue."""
conn.rpush(self.key, pickled_job) job.save()
conn.rpush(self.key, job.id)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Enqueues a function call for delayed execution. """Enqueues a function call for delayed execution.
@ -115,8 +82,8 @@ class Queue(object):
""" """
job = self._create_job(f, *args, **kwargs) job = self._create_job(f, *args, **kwargs)
job.enqueued_at = datetime.utcnow() job.enqueued_at = datetime.utcnow()
self._push(job.pickle()) self.enqueue_job(job)
return DelayedResult(job.rv_key) return Job(job.id)
def requeue(self, job): def requeue(self, job):
"""Requeues an existing (typically a failed job) onto the queue.""" """Requeues an existing (typically a failed job) onto the queue."""

@ -15,7 +15,7 @@ def get_version():
raise RuntimeError('No version info found.') raise RuntimeError('No version info found.')
def get_dependencies(): def get_dependencies():
deps = ['redis', 'procname'] deps = ['redis', 'procname', 'times']
deps += ['logbook'] # should be soft dependency? deps += ['logbook'] # should be soft dependency?
if sys.version_info < (2, 7) or \ if sys.version_info < (2, 7) or \
sys.version_info >= (3, 0) and sys.version_info < (3, 2): sys.version_info >= (3, 0) and sys.version_info < (3, 2):

@ -1,8 +1,8 @@
from datetime import datetime
from tests import RQTestCase from tests import RQTestCase
from pickle import dumps, loads from pickle import dumps, loads
from rq.job import Job from rq.job import Job
#from rq import Queue, Worker from rq.exceptions import NoSuchJobError, UnpickleError
from rq.exceptions import UnpickleError
def arbitrary_function(x, y, z=1): def arbitrary_function(x, y, z=1):
@ -10,25 +10,91 @@ def arbitrary_function(x, y, z=1):
class TestJob(RQTestCase): class TestJob(RQTestCase):
def test_create_job(self): def test_create_empty_job(self):
"""Creation of jobs.""" """Creation of new empty jobs."""
job = Job(arbitrary_function, 3, 4, z=2) job = Job()
# Jobs have a random UUID
self.assertIsNotNone(job.id)
# Jobs have no data yet...
self.assertEquals(job.func, None)
self.assertEquals(job.args, None)
self.assertEquals(job.kwargs, None)
self.assertEquals(job.origin, None)
self.assertEquals(job.enqueued_at, None)
self.assertEquals(job.result, None)
self.assertEquals(job.exc_info, None)
# ...except for a created_at property
self.assertIsNotNone(job.created_at)
def test_create_normal_job(self):
"""Creation of jobs for function calls."""
job = Job.for_call(arbitrary_function, 3, 4, z=2)
# Jobs have a random UUID
self.assertIsNotNone(job.id)
self.assertIsNotNone(job.created_at)
# Job data is set...
self.assertEquals(job.func, arbitrary_function) self.assertEquals(job.func, arbitrary_function)
self.assertEquals(job.args, (3, 4)) self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, {'z': 2}) self.assertEquals(job.kwargs, {'z': 2})
# ...but metadata is not
self.assertIsNone(job.origin) self.assertIsNone(job.origin)
self.assertIsNotNone(job.created_at)
self.assertIsNone(job.enqueued_at) self.assertIsNone(job.enqueued_at)
self.assertIsNotNone(job.rv_key) self.assertIsNone(job.result)
def test_save(self):
"""Storing jobs."""
job = Job.for_call(arbitrary_function, 3, 4, z=2)
def test_pickle_job(self): # Saving creates a Redis hash
"""Pickling of jobs.""" self.assertEquals(self.testconn.exists(job.key), False)
job = Job(arbitrary_function, 3, 4, z=2) job.save()
job2 = loads(dumps(job)) self.assertEquals(self.testconn.type(job.key), 'hash')
# Saving writes pickled job data
unpickled_data = loads(self.testconn.hget(job.key, 'data'))
self.assertEquals(unpickled_data[0], arbitrary_function)
def test_fetch(self):
"""Fetching jobs."""
# Prepare test
self.testconn.hset('rq:job:some_id', 'data', "(ctest_job\narbitrary_function\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.")
self.testconn.hset('rq:job:some_id', 'created_at', "2012-02-07 22:13:24+0000")
# Fetch returns a job
job = Job.fetch('some_id')
self.assertEquals(job.id, 'some_id')
self.assertEquals(job.func, arbitrary_function)
self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, dict(z=2))
self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24))
def test_persistence_of_jobs(self):
"""Storing and fetching of jobs."""
job = Job.for_call(arbitrary_function, 3, 4, z=2)
job.save()
job2 = Job.fetch(job.id)
self.assertEquals(job.func, job2.func) self.assertEquals(job.func, job2.func)
self.assertEquals(job.args, job2.args) self.assertEquals(job.args, job2.args)
self.assertEquals(job.kwargs, job2.kwargs) self.assertEquals(job.kwargs, job2.kwargs)
# Mathematical equation
self.assertEquals(job, job2)
def test_fetching_can_fail(self):
"""Fetching fails for non-existing jobs."""
with self.assertRaises(NoSuchJobError):
Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0')
def test_unpickle_errors(self): def test_unpickle_errors(self):
"""Handling of unpickl'ing errors.""" """Handling of unpickl'ing errors."""
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
@ -37,7 +103,7 @@ class TestJob(RQTestCase):
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
Job.unpickle(13) Job.unpickle(13)
pickle_data = dumps(Job(arbitrary_function, 2, 3)) pickle_data = dumps(Job.for_call(arbitrary_function, 2, 3))
corrupt_data = pickle_data.replace('arbitrary', 'b0rken') corrupt_data = pickle_data.replace('arbitrary', 'b0rken')
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
Job.unpickle(corrupt_data) Job.unpickle(corrupt_data)

@ -31,26 +31,26 @@ class TestQueue(RQTestCase):
def test_queue_empty(self): def test_queue_empty(self):
"""Detecting empty queues.""" """Detecting empty queues."""
q = Queue('my-queue') q = Queue('example')
self.assertEquals(q.is_empty(), True) self.assertEquals(q.is_empty(), True)
self.testconn.rpush('rq:queue:my-queue', 'some val') self.testconn.rpush('rq:queue:example', 'sentinel message')
self.assertEquals(q.is_empty(), False) self.assertEquals(q.is_empty(), False)
def test_enqueue(self): def test_enqueue(self):
"""Putting work on queues.""" """Enqueueing writes job IDs to queues."""
q = Queue('my-queue') q = Queue()
self.assertEquals(q.is_empty(), True) self.assertEquals(q.is_empty(), True)
# testjob spec holds which queue this is sent to # testjob spec holds which queue this is sent to
q.enqueue(testjob, 'Nick', foo='bar') job = q.enqueue(testjob, 'Nick', foo='bar')
self.assertEquals(q.is_empty(), False) job_id = job.id
for job in q.jobs:
if job.func == testjob: # Inspect data inside Redis
break q_key = 'rq:queue:default'
else: self.assertEquals(self.testconn.llen(q_key), 1)
self.fail('Job not found on queue.') self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id)
def test_dequeue(self): def test_dequeue(self):

@ -31,7 +31,7 @@ class TestWorker(RQTestCase):
# NOTE: We have to fake this enqueueing for this test case. # NOTE: We have to fake this enqueueing for this test case.
# What we're simulating here is a call to a function that is not # What we're simulating here is a call to a function that is not
# importable from the worker process. # importable from the worker process.
job = Job(failing_job, 3) job = Job.for_call(failing_job, 3)
pickled_job = job.pickle() pickled_job = job.pickle()
invalid_data = pickled_job.replace( invalid_data = pickled_job.replace(
'failing_job', 'nonexisting_job') 'failing_job', 'nonexisting_job')

Loading…
Cancel
Save