Don't silently fail when unpickling.

When a pickled job string can't be unpickled because some required
module isn't loadable, this leads to an `UnpickleError` in the worker
(not in the horse).

Currently we just assume "garbage" in the job's data field, and silently
ignore it.

This is bad.

Really bad.

Because it avoids the normal exception handling mechanism that RQ has.

Historically, this "feature" was introduced to ignore any invalid pickle
data ("bad strings") on queues, and go on. However, we must assume data
inside `job.data` to be valid pickle data.

While an invalid _format_ of pickle data (e.g. the string "blablah"
isn't valid) leads to unpickle errors, unpickling errors will also occur
when the job can't be validly constructed in memory for other reasons,
like being unable to load a specific class.

Django is a good example of this: try submitting jobs that use
`django.conf.settings` while the `DJANGO_SETTINGS_MODULE` env var isn't
set. Currently, RQ workers will drop these jobs and dismiss them like
any non-valid pickle data. You won't be notified.

This patch changes RQ's behaviour to never ignore invalid string data on
any queue and _always_ handle these errors explicitly (but without
bringing the main loop down, of course).
main
Vincent Driessen 12 years ago
parent 7216b1eedb
commit 90b15fd0b6

@ -160,6 +160,15 @@ class Job(object):
job.refresh() job.refresh()
return job return job
@classmethod
def safe_fetch(cls, id, connection=None):
"""Fetches a persisted job from its corresponding Redis key, but does
not instantiate it, making it impossible to get UnpickleErrors.
"""
job = cls(id, connection=connection)
job.refresh(safe=True)
return job
def __init__(self, id=None, connection=None): def __init__(self, id=None, connection=None):
self.connection = resolve_connection(connection) self.connection = resolve_connection(connection)
self._id = id self._id = id
@ -240,7 +249,7 @@ class Job(object):
# Persistence # Persistence
def refresh(self): # noqa def refresh(self, safe=False): # noqa
"""Overwrite the current instance's properties with the values in the """Overwrite the current instance's properties with the values in the
corresponding Redis key. corresponding Redis key.
@ -257,7 +266,12 @@ class Job(object):
else: else:
return times.to_universal(date_str) return times.to_universal(date_str)
self._func_name, self._instance, self._args, self._kwargs = unpickle(obj.get('data')) # noqa self.data = obj.get('data')
try:
self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data)
except UnpickleError:
if not safe:
raise
self.created_at = to_date(obj.get('created_at')) self.created_at = to_date(obj.get('created_at'))
self.origin = obj.get('origin') self.origin = obj.get('origin')
self.description = obj.get('description') self.description = obj.get('description')

@ -73,7 +73,7 @@ class Queue(object):
"""Returns a list of all (valid) jobs in the queue.""" """Returns a list of all (valid) jobs in the queue."""
def safe_fetch(job_id): def safe_fetch(job_id):
try: try:
job = Job.fetch(job_id, connection=self.connection) job = Job.safe_fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
return None return None
except UnpickleError: except UnpickleError:

@ -19,7 +19,7 @@ except ImportError:
from logging import Logger from logging import Logger
from .queue import Queue, get_failed_queue from .queue import Queue, get_failed_queue
from .connections import get_current_connection from .connections import get_current_connection
from .job import Status from .job import Job, Status
from .utils import make_colorizer from .utils import make_colorizer
from .exceptions import NoQueueError, UnpickleError from .exceptions import NoQueueError, UnpickleError
from .timeouts import death_penalty_after from .timeouts import death_penalty_after
@ -309,13 +309,8 @@ class Worker(object):
except StopRequested: except StopRequested:
break break
except UnpickleError as e: except UnpickleError as e:
msg = '*** Ignoring unpickleable data on %s.' % \ job = Job.safe_fetch(e.job_id)
green(e.queue.name) self.handle_exception(job, *sys.exc_info())
self.log.warning(msg)
self.log.debug('Data follows:')
self.log.debug(e.raw_data)
self.log.debug('End of unreadable data.')
self.failed_queue.push_job_id(e.job_id)
continue continue
self.state = 'busy' self.state = 'busy'

Loading…
Cancel
Save