diff --git a/rq/exceptions.py b/rq/exceptions.py index 6269e81..979d4f7 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -1,2 +1,5 @@ class NoQueueError(Exception): pass + +class DequeueError(Exception): + pass diff --git a/rq/queue.py b/rq/queue.py index 7614547..4e078b1 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -2,6 +2,7 @@ import uuid from functools import total_ordering from pickle import loads, dumps from .proxy import conn +from .exceptions import DequeueError class DelayedResult(object): """Proxy object that is returned as a result of `Queue.enqueue()` calls. @@ -44,8 +45,11 @@ class Job(object): @classmethod def unpickle(cls, pickle_data): """Constructs a Job instance form the given pickle'd job tuple data.""" - job_tuple = loads(pickle_data) - return Job(job_tuple) + try: + job_tuple = loads(pickle_data) + return Job(job_tuple) + except (AttributeError, ValueError, IndexError): + raise DequeueError('Could not decode job tuple.') def __init__(self, job_tuple, origin=None): self.func, self.args, self.kwargs, self.rv_key = job_tuple diff --git a/tests/test_rq.py b/tests/test_rq.py index ada407c..aaf45ef 100644 --- a/tests/test_rq.py +++ b/tests/test_rq.py @@ -1,8 +1,9 @@ import unittest -from pickle import loads +from pickle import loads, dumps from redis import Redis from logbook import NullHandler from rq import conn, Queue, Worker +from rq.exceptions import DequeueError # Test data def testjob(name=None): @@ -135,6 +136,37 @@ class TestQueue(RQTestCase): self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') + def test_dequeue_unpicklable_data(self): + """Error handling of invalid pickle data.""" + + # Push non-pickle data on the queue + q = Queue('foo') + blob = 'this is nothing like pickled data' + self.testconn.rpush(q._key, blob) + + with self.assertRaises(DequeueError): + q.dequeue() # error occurs when perform()'ing + + # Push value pickle data, but not representing a job tuple + q = Queue('foo') + blob = dumps('this is not a job tuple') + self.testconn.rpush(q._key, blob) + + with self.assertRaises(DequeueError): + q.dequeue() # error occurs when perform()'ing + + # Push slightly incorrect pickled data onto the queue (simulate + # a function that can't be imported from the worker) + q = Queue('foo') + + job_tuple = dumps((testjob, [], dict(name='Frank'), 'unused')) + blob = job_tuple.replace('testjob', 'fooobar') + self.testconn.rpush(q._key, blob) + + with self.assertRaises(DequeueError): + q.dequeue() # error occurs when dequeue()'ing + + class TestWorker(RQTestCase): def test_create_worker(self): """Worker creation."""