Put unreadable tasks on the failure queue.

main
Vincent Driessen 13 years ago
parent 0be1cb6ac0
commit 7eb8d92605

@ -2,5 +2,7 @@ class NoQueueError(Exception):
pass pass
class UnpickleError(Exception): class UnpickleError(Exception):
pass def __init__(self, message, raw_data):
super(UnpickleError, self).__init__(message)
self.raw_data = raw_data

@ -15,7 +15,7 @@ class Job(object):
assert isinstance(unpickled_obj, Job) assert isinstance(unpickled_obj, Job)
return unpickled_obj return unpickled_obj
except (AssertionError, AttributeError, IndexError, TypeError): except (AssertionError, AttributeError, IndexError, TypeError):
raise UnpickleError('Could not unpickle Job.') raise UnpickleError('Could not unpickle Job.', pickle_data)
def __init__(self, func, *args, **kwargs): def __init__(self, func, *args, **kwargs):
self._id = unicode(uuid4()) self._id = unicode(uuid4())

@ -3,6 +3,7 @@ from functools import total_ordering
from pickle import loads from pickle import loads
from .proxy import conn from .proxy import conn
from .job import Job from .job import Job
from .exceptions import UnpickleError
class DelayedResult(object): class DelayedResult(object):
@ -103,6 +104,10 @@ class Queue(object):
job.origin = self.name job.origin = self.name
return job return job
def _push(self, pickled_job):
"""Enqueues a pickled_job on the corresponding Redis queue."""
conn.rpush(self.key, pickled_job)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Enqueues a function call for delayed execution. """Enqueues a function call for delayed execution.
@ -111,7 +116,7 @@ class Queue(object):
""" """
job = self._create_job(f, *args, **kwargs) job = self._create_job(f, *args, **kwargs)
job.enqueued_at = datetime.utcnow() job.enqueued_at = datetime.utcnow()
conn.rpush(self.key, job.pickle()) self._push(job.pickle())
return DelayedResult(job.rv_key) return DelayedResult(job.rv_key)
def requeue(self, job): def requeue(self, job):
@ -126,7 +131,13 @@ class Queue(object):
blob = conn.lpop(self.key) blob = conn.lpop(self.key)
if blob is None: if blob is None:
return None return None
job = Job.unpickle(blob) try:
job = Job.unpickle(blob)
except UnpickleError as e:
# Attach queue information on the exception for improved error
# reporting
e.queue = self
raise e
job.origin = self job.origin = self
return job return job
@ -162,8 +173,14 @@ class Queue(object):
return None return None
queue_key, blob = redis_result queue_key, blob = redis_result
job = Job.unpickle(blob)
queue = Queue.from_queue_key(queue_key) queue = Queue.from_queue_key(queue_key)
try:
job = Job.unpickle(blob)
except UnpickleError as e:
# Attach queue information on the exception for improved error
# reporting
e.queue = queue
raise e
job.origin = queue job.origin = queue
return job return job

@ -14,7 +14,7 @@ except ImportError:
from logging import Logger from logging import Logger
from .queue import Queue from .queue import Queue
from .proxy import conn from .proxy import conn
from .exceptions import NoQueueError from .exceptions import NoQueueError, UnpickleError
def iterable(x): def iterable(x):
return hasattr(x, '__iter__') return hasattr(x, '__iter__')
@ -34,6 +34,7 @@ def signal_name(signum):
except KeyError: except KeyError:
return 'SIG_UNKNOWN' return 'SIG_UNKNOWN'
class Worker(object): class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:' redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = 'rq:workers' redis_workers_keys = 'rq:workers'
@ -249,7 +250,18 @@ class Worker(object):
self.procline('Listening on %s' % (','.join(qnames))) self.procline('Listening on %s' % (','.join(qnames)))
self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) self.log.info('*** Listening for work on %s...' % (', '.join(qnames)))
wait_for_job = not burst wait_for_job = not burst
job = Queue.dequeue_any(self.queues, wait_for_job) try:
job = Queue.dequeue_any(self.queues, wait_for_job)
except UnpickleError as e:
self.log.warning('*** Ignoring unpickleable data on %s.', e.queue)
self.log.debug('Data follows:')
self.log.debug(e.raw_data)
self.log.debug('End of unreadable data.')
q = Queue('failure')
q._push(e.raw_data)
continue
if job is None: if job is None:
break break
self.state = 'busy' self.state = 'busy'

@ -9,6 +9,10 @@ def testjob(name=None):
name = 'Stranger' name = 'Stranger'
return 'Hi there, %s!' % (name,) return 'Hi there, %s!' % (name,)
def failing_job(x):
# Will throw a division-by-zero error
return x / 0
class RQTestCase(unittest.TestCase): class RQTestCase(unittest.TestCase):
"""Base class to inherit test cases from for RQ. """Base class to inherit test cases from for RQ.

@ -48,6 +48,7 @@ class TestQueue(RQTestCase):
self.assertEquals(q.empty, False) self.assertEquals(q.empty, False)
self.assertQueueContains(q, testjob) self.assertQueueContains(q, testjob)
def test_dequeue(self): def test_dequeue(self):
"""Fetching work from specific queue.""" """Fetching work from specific queue."""
q = Queue('foo') q = Queue('foo')
@ -60,7 +61,6 @@ class TestQueue(RQTestCase):
self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.args[0], 'Rick')
self.assertEquals(job.kwargs['foo'], 'bar') self.assertEquals(job.kwargs['foo'], 'bar')
def test_dequeue_any(self): def test_dequeue_any(self):
"""Fetching work from any given queue.""" """Fetching work from any given queue."""
fooq = Queue('foo') fooq = Queue('foo')
@ -87,7 +87,6 @@ class TestQueue(RQTestCase):
self.assertEquals(job.origin, barq) self.assertEquals(job.origin, barq)
self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.')
def test_dequeue_unpicklable_data(self): def test_dequeue_unpicklable_data(self):
"""Error handling of invalid pickle data.""" """Error handling of invalid pickle data."""

@ -1,6 +1,7 @@
from tests import RQTestCase from tests import RQTestCase
from tests import testjob from tests import testjob, failing_job
from rq import Queue, Worker from rq import Queue, Worker
from rq.job import Job
class TestWorker(RQTestCase): class TestWorker(RQTestCase):
@ -19,4 +20,33 @@ class TestWorker(RQTestCase):
fooq.enqueue(testjob, name='Frank') fooq.enqueue(testjob, name='Frank')
self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.')
def test_work_is_unreadable(self):
"""Worker processes unreadable job."""
q = Queue()
failure_q = Queue('failure')
self.assertEquals(failure_q.count, 0)
self.assertEquals(q.count, 0)
# NOTE: We have to fake this enqueueing for this test case.
# What we're simulating here is a call to a function that is not
# importable from the worker process.
job = Job(failing_job, 3)
pickled_job = job.pickle()
invalid_data = pickled_job.replace(
'failing_job', 'nonexisting_job')
# We use the low-level internal function to enqueue any data (bypassing
# validity checks)
q._push(invalid_data)
self.assertEquals(q.count, 1)
# All set, we're going to process it
w = Worker([q])
w.work(burst=True) # should silently pass
self.assertEquals(q.count, 0)
self.assertEquals(failure_q.count, 1)

Loading…
Cancel
Save