Change the way jobs are pickled.

There is no job tuple anymore, but instead Jobs are picklable by
themselves natively.  Furthermore, I've added a way to annotate Jobs
with created_at and enqueued_at timestamps, to drive any future Job
performance stats.  (And to enable requeueing, while keeping hold of the
queue that the Job originated from.)

This fixes #17.
main
Vincent Driessen 13 years ago
parent 0503eb2829
commit 0be1cb6ac0

@ -4,6 +4,3 @@ class NoQueueError(Exception):
class UnpickleError(Exception): class UnpickleError(Exception):
pass pass
class DequeueError(Exception):
pass

@ -1,4 +1,6 @@
from pickle import loads from datetime import datetime
from uuid import uuid4
from pickle import loads, dumps
from .exceptions import UnpickleError from .exceptions import UnpickleError
@ -9,17 +11,34 @@ class Job(object):
def unpickle(cls, pickle_data): def unpickle(cls, pickle_data):
"""Constructs a Job instance form the given pickle'd job tuple data.""" """Constructs a Job instance form the given pickle'd job tuple data."""
try: try:
return loads(pickle_data) unpickled_obj = loads(pickle_data)
except (AttributeError, IndexError, TypeError): assert isinstance(unpickled_obj, Job)
raise UnpickleError('Could not decode job tuple.') return unpickled_obj
except (AssertionError, AttributeError, IndexError, TypeError):
raise UnpickleError('Could not unpickle Job.')
def __init__(self, func, *args, **kwargs): def __init__(self, func, *args, **kwargs):
self._id = unicode(uuid4())
self.func = func self.func = func
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
self.rv_key = None
self.origin = None self.origin = None
self.timestamp = None self.created_at = datetime.utcnow()
self.enqueued_at = None
def pickle(self):
"""Returns the pickle'd string represenation of a Job. Suitable for writing to Redis."""
return dumps(self)
@property
def rv_key(self):
"""Returns the Redis key under which the Job's result will be stored, if applicable."""
return 'rq:result:%s' % (self._id,)
@property
def id(self):
"""Returns the Job's internal ID."""
return self._id
def perform(self): def perform(self):
"""Invokes the job function with the job arguments. """Invokes the job function with the job arguments.
@ -37,5 +56,7 @@ class Job(object):
return '%s(%s)' % (self.func.__name__, ', '.join(arg_list)) return '%s(%s)' % (self.func.__name__, ', '.join(arg_list))
def __str__(self): def __str__(self):
return '<Job %s>' % self.call_string return '<Job %s: %s>' % (self.id, self.call_string)
def __eq__(self, other):
return cmp(self.id, other.id)

@ -1,6 +1,6 @@
import uuid from datetime import datetime
from functools import total_ordering from functools import total_ordering
from pickle import loads, dumps from pickle import loads
from .proxy import conn from .proxy import conn
from .job import Job from .job import Job
@ -78,26 +78,45 @@ class Queue(object):
@property @property
def messages(self): def messages(self):
"""Returns a list of all messages in the queue.""" """Returns a list of all messages (pickled job data) in the queue."""
return conn.lrange(self.key, 0, -1) return conn.lrange(self.key, 0, -1)
@property
def jobs(self):
"""Returns a list of all jobs in the queue."""
return map(Job.unpickle, self.messages)
@property @property
def count(self): def count(self):
"""Returns a count of all messages in the queue.""" """Returns a count of all messages in the queue."""
return conn.llen(self.key) return conn.llen(self.key)
def _create_job(self, f, *args, **kwargs):
"""Creates a Job instance for the given function call and attaches queue
meta data to it.
"""
if f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed by workers.')
job = Job(f, *args, **kwargs)
job.origin = self.name
return job
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Enqueues a function call for delayed execution. """Enqueues a function call for delayed execution.
Expects the function to call, along with the arguments and keyword Expects the function to call, along with the arguments and keyword
arguments. arguments.
""" """
rv_key = 'rq:result:%s:%s' % (self.name, str(uuid.uuid4())) job = self._create_job(f, *args, **kwargs)
if f.__module__ == '__main__': job.enqueued_at = datetime.utcnow()
raise ValueError('Functions from the __main__ module cannot be processed by workers.') conn.rpush(self.key, job.pickle())
message = dumps((f, args, kwargs, rv_key)) return DelayedResult(job.rv_key)
conn.rpush(self.key, message)
return DelayedResult(rv_key) def requeue(self, job):
"""Requeues an existing (typically a failed job) onto the queue."""
pass
def dequeue(self): def dequeue(self):
"""Dequeues the function call at the front of this Queue. """Dequeues the function call at the front of this Queue.

@ -1,5 +1,4 @@
import unittest import unittest
from pickle import loads
from redis import Redis from redis import Redis
from logbook import NullHandler from logbook import NullHandler
from rq import conn from rq import conn
@ -53,9 +52,8 @@ class RQTestCase(unittest.TestCase):
def assertQueueContains(self, queue, that_func): def assertQueueContains(self, queue, that_func):
# Do a queue scan (this is O(n), but we're in a test, so hey) # Do a queue scan (this is O(n), but we're in a test, so hey)
for message in queue.messages: for job in queue.jobs:
f, _, args, kwargs = loads(message) if job.func == that_func:
if f == that_func:
return return
self.fail('Queue %s does not contain message for function %s' % self.fail('Queue %s does not contain message for function %s' %
(queue.key, that_func)) (queue.key, that_func))

@ -17,8 +17,9 @@ class TestJob(RQTestCase):
self.assertEquals(job.args, (3, 4)) self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, {'z': 2}) self.assertEquals(job.kwargs, {'z': 2})
self.assertIsNone(job.origin) self.assertIsNone(job.origin)
self.assertIsNone(job.timestamp) self.assertIsNotNone(job.created_at)
self.assertIsNone(job.rv_key) self.assertIsNone(job.enqueued_at)
self.assertIsNotNone(job.rv_key)
def test_pickle_job(self): def test_pickle_job(self):
"""Pickling of jobs.""" """Pickling of jobs."""

@ -2,7 +2,7 @@ from tests import RQTestCase
from tests import testjob from tests import testjob
from pickle import dumps from pickle import dumps
from rq import Queue from rq import Queue
from rq.exceptions import DequeueError from rq.exceptions import UnpickleError
class TestQueue(RQTestCase): class TestQueue(RQTestCase):
@ -96,15 +96,15 @@ class TestQueue(RQTestCase):
blob = 'this is nothing like pickled data' blob = 'this is nothing like pickled data'
self.testconn.rpush(q._key, blob) self.testconn.rpush(q._key, blob)
with self.assertRaises(DequeueError): with self.assertRaises(UnpickleError):
q.dequeue() # error occurs when perform()'ing q.dequeue() # error occurs when perform()'ing
# Push value pickle data, but not representing a job tuple # Push value pickle data, but not representing a job tuple
q = Queue('foo') q = Queue('foo')
blob = dumps('this is not a job tuple') blob = dumps('this is pickled, but not a job tuple')
self.testconn.rpush(q._key, blob) self.testconn.rpush(q._key, blob)
with self.assertRaises(DequeueError): with self.assertRaises(UnpickleError):
q.dequeue() # error occurs when perform()'ing q.dequeue() # error occurs when perform()'ing
# Push slightly incorrect pickled data onto the queue (simulate # Push slightly incorrect pickled data onto the queue (simulate
@ -115,6 +115,6 @@ class TestQueue(RQTestCase):
blob = job_tuple.replace('testjob', 'fooobar') blob = job_tuple.replace('testjob', 'fooobar')
self.testconn.rpush(q._key, blob) self.testconn.rpush(q._key, blob)
with self.assertRaises(DequeueError): with self.assertRaises(UnpickleError):
q.dequeue() # error occurs when dequeue()'ing q.dequeue() # error occurs when dequeue()'ing

Loading…
Cancel
Save