Putting failed jobs on the failure queue.

main
Vincent Driessen 13 years ago
parent 7eb8d92605
commit fdce187c27

@ -14,7 +14,7 @@ class Job(object):
unpickled_obj = loads(pickle_data) unpickled_obj = loads(pickle_data)
assert isinstance(unpickled_obj, Job) assert isinstance(unpickled_obj, Job)
return unpickled_obj return unpickled_obj
except (AssertionError, AttributeError, IndexError, TypeError): except (AssertionError, AttributeError, IndexError, TypeError, KeyError):
raise UnpickleError('Could not unpickle Job.', pickle_data) raise UnpickleError('Could not unpickle Job.', pickle_data)
def __init__(self, func, *args, **kwargs): def __init__(self, func, *args, **kwargs):

@ -1,6 +1,7 @@
import sys import sys
import os import os
import errno import errno
import datetime
import random import random
import time import time
import procname import procname
@ -81,6 +82,7 @@ class Worker(object):
self._horse_pid = 0 self._horse_pid = 0
self._stopped = False self._stopped = False
self.log = Logger('worker') self.log = Logger('worker')
self.failure_queue = Queue('failure')
def validate_queues(self): def validate_queues(self):
@ -253,13 +255,13 @@ class Worker(object):
try: try:
job = Queue.dequeue_any(self.queues, wait_for_job) job = Queue.dequeue_any(self.queues, wait_for_job)
except UnpickleError as e: except UnpickleError as e:
self.log.warning('*** Ignoring unpickleable data on %s.', e.queue) self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,))
self.log.debug('Data follows:') self.log.debug('Data follows:')
self.log.debug(e.raw_data) self.log.debug(e.raw_data)
self.log.debug('End of unreadable data.') self.log.debug('End of unreadable data.')
q = Queue('failure') fq = self.failure_queue
q._push(e.raw_data) fq._push(e.raw_data)
continue continue
if job is None: if job is None:
@ -313,9 +315,15 @@ class Worker(object):
self.log.info(msg) self.log.info(msg)
try: try:
rv = job.perform() rv = job.perform()
except Exception, e: except Exception as e:
rv = e rv = e
self.log.exception(e) self.log.exception(e)
fq = self.failure_queue
self.log.warning('Moving job to %s queue.' % (fq.name,))
job.ended_at = datetime.datetime.utcnow()
job.exc_info = e
fq._push(job.pickle())
else: else:
if rv is not None: if rv is not None:
self.log.info('Job result = %s' % (rv,)) self.log.info('Job result = %s' % (rv,))

@ -49,4 +49,21 @@ class TestWorker(RQTestCase):
self.assertEquals(failure_q.count, 1) self.assertEquals(failure_q.count, 1)
def test_work_fails(self):
"""Worker processes failing job."""
q = Queue()
failure_q = Queue('failure')
self.assertEquals(failure_q.count, 0)
self.assertEquals(q.count, 0)
q.enqueue(failing_job)
self.assertEquals(q.count, 1)
w = Worker([q])
w.work(burst=True) # should silently pass
self.assertEquals(q.count, 0)
self.assertEquals(failure_q.count, 1)

Loading…
Cancel
Save