diff --git a/rq/exceptions.py b/rq/exceptions.py index 9880cd6..9d66d75 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -4,6 +4,3 @@ class NoQueueError(Exception): class UnpickleError(Exception): pass -class DequeueError(Exception): - pass - diff --git a/rq/job.py b/rq/job.py index ff9effd..07318e8 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,4 +1,6 @@ -from pickle import loads +from datetime import datetime +from uuid import uuid4 +from pickle import loads, dumps from .exceptions import UnpickleError @@ -9,17 +11,34 @@ class Job(object): def unpickle(cls, pickle_data): """Constructs a Job instance form the given pickle'd job tuple data.""" try: - return loads(pickle_data) - except (AttributeError, IndexError, TypeError): - raise UnpickleError('Could not decode job tuple.') + unpickled_obj = loads(pickle_data) + assert isinstance(unpickled_obj, Job) + return unpickled_obj + except (AssertionError, AttributeError, IndexError, TypeError): + raise UnpickleError('Could not unpickle Job.') def __init__(self, func, *args, **kwargs): + self._id = unicode(uuid4()) self.func = func self.args = args self.kwargs = kwargs - self.rv_key = None self.origin = None - self.timestamp = None + self.created_at = datetime.utcnow() + self.enqueued_at = None + + def pickle(self): + """Returns the pickle'd string represenation of a Job. Suitable for writing to Redis.""" + return dumps(self) + + @property + def rv_key(self): + """Returns the Redis key under which the Job's result will be stored, if applicable.""" + return 'rq:result:%s' % (self._id,) + + @property + def id(self): + """Returns the Job's internal ID.""" + return self._id def perform(self): """Invokes the job function with the job arguments. @@ -37,5 +56,7 @@ class Job(object): return '%s(%s)' % (self.func.__name__, ', '.join(arg_list)) def __str__(self): - return '' % self.call_string + return '' % (self.id, self.call_string) + def __eq__(self, other): + return cmp(self.id, other.id) diff --git a/rq/queue.py b/rq/queue.py index bdd37a0..9440f4f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,6 +1,6 @@ -import uuid +from datetime import datetime from functools import total_ordering -from pickle import loads, dumps +from pickle import loads from .proxy import conn from .job import Job @@ -78,26 +78,45 @@ class Queue(object): @property def messages(self): - """Returns a list of all messages in the queue.""" + """Returns a list of all messages (pickled job data) in the queue.""" return conn.lrange(self.key, 0, -1) + @property + def jobs(self): + """Returns a list of all jobs in the queue.""" + return map(Job.unpickle, self.messages) + @property def count(self): """Returns a count of all messages in the queue.""" return conn.llen(self.key) + + def _create_job(self, f, *args, **kwargs): + """Creates a Job instance for the given function call and attaches queue + meta data to it. + """ + if f.__module__ == '__main__': + raise ValueError('Functions from the __main__ module cannot be processed by workers.') + + job = Job(f, *args, **kwargs) + job.origin = self.name + return job + def enqueue(self, f, *args, **kwargs): """Enqueues a function call for delayed execution. Expects the function to call, along with the arguments and keyword arguments. """ - rv_key = 'rq:result:%s:%s' % (self.name, str(uuid.uuid4())) - if f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed by workers.') - message = dumps((f, args, kwargs, rv_key)) - conn.rpush(self.key, message) - return DelayedResult(rv_key) + job = self._create_job(f, *args, **kwargs) + job.enqueued_at = datetime.utcnow() + conn.rpush(self.key, job.pickle()) + return DelayedResult(job.rv_key) + + def requeue(self, job): + """Requeues an existing (typically a failed job) onto the queue.""" + pass def dequeue(self): """Dequeues the function call at the front of this Queue. diff --git a/tests/__init__.py b/tests/__init__.py index c22ab4b..1ecf83b 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,5 +1,4 @@ import unittest -from pickle import loads from redis import Redis from logbook import NullHandler from rq import conn @@ -53,9 +52,8 @@ class RQTestCase(unittest.TestCase): def assertQueueContains(self, queue, that_func): # Do a queue scan (this is O(n), but we're in a test, so hey) - for message in queue.messages: - f, _, args, kwargs = loads(message) - if f == that_func: + for job in queue.jobs: + if job.func == that_func: return self.fail('Queue %s does not contain message for function %s' % (queue.key, that_func)) diff --git a/tests/test_job.py b/tests/test_job.py index 460f015..de6c78e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -17,8 +17,9 @@ class TestJob(RQTestCase): self.assertEquals(job.args, (3, 4)) self.assertEquals(job.kwargs, {'z': 2}) self.assertIsNone(job.origin) - self.assertIsNone(job.timestamp) - self.assertIsNone(job.rv_key) + self.assertIsNotNone(job.created_at) + self.assertIsNone(job.enqueued_at) + self.assertIsNotNone(job.rv_key) def test_pickle_job(self): """Pickling of jobs.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 7b49af0..415f58d 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -2,7 +2,7 @@ from tests import RQTestCase from tests import testjob from pickle import dumps from rq import Queue -from rq.exceptions import DequeueError +from rq.exceptions import UnpickleError class TestQueue(RQTestCase): @@ -96,15 +96,15 @@ class TestQueue(RQTestCase): blob = 'this is nothing like pickled data' self.testconn.rpush(q._key, blob) - with self.assertRaises(DequeueError): + with self.assertRaises(UnpickleError): q.dequeue() # error occurs when perform()'ing # Push value pickle data, but not representing a job tuple q = Queue('foo') - blob = dumps('this is not a job tuple') + blob = dumps('this is pickled, but not a job tuple') self.testconn.rpush(q._key, blob) - with self.assertRaises(DequeueError): + with self.assertRaises(UnpickleError): q.dequeue() # error occurs when perform()'ing # Push slightly incorrect pickled data onto the queue (simulate @@ -115,6 +115,6 @@ class TestQueue(RQTestCase): blob = job_tuple.replace('testjob', 'fooobar') self.testconn.rpush(q._key, blob) - with self.assertRaises(DequeueError): + with self.assertRaises(UnpickleError): q.dequeue() # error occurs when dequeue()'ing