From 90b15fd0b63464769b71a21772d0f3e255f5cc6e Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 18 Jan 2013 14:25:05 +0100 Subject: [PATCH] Don't silently fail when unpickling. When a pickled job string can't be unpickled because some required module isn't loadable, this leads to an `UnpickleError` in the worker (not in the horse). Currently we just assume "garbage" in the job's data field, and silently ignore it. This is bad. Really bad. Because it avoids the normal exception handling mechanism that RQ has. Historically, this "feature" was introduced to ignore any invalid pickle data ("bad strings") on queues, and go on. However, we must assume data inside `job.data` to be valid pickle data. While an invalid _format_ of pickle data (e.g. the string "blablah" isn't valid) leads to unpickle errors, unpickling errors will also occur when the job can't be validly constructed in memory for other reasons, like being unable to load a specific class. Django is a good example of this: try submitting jobs that use `django.conf.settings` while the `DJANGO_SETTINGS_MODULE` env var isn't set. Currently, RQ workers will drop these jobs and dismiss them like any non-valid pickle data. You won't be notified. This patch changes RQ's behaviour to never ignore invalid string data on any queue and _always_ handle these errors explicitly (but without bringing the main loop down, of course). --- rq/job.py | 18 ++++++++++++++++-- rq/queue.py | 2 +- rq/worker.py | 11 +++-------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/rq/job.py b/rq/job.py index 6c084ac..e94f512 100644 --- a/rq/job.py +++ b/rq/job.py @@ -160,6 +160,15 @@ class Job(object): job.refresh() return job + @classmethod + def safe_fetch(cls, id, connection=None): + """Fetches a persisted job from its corresponding Redis key, but does + not instantiate it, making it impossible to get UnpickleErrors. + """ + job = cls(id, connection=connection) + job.refresh(safe=True) + return job + def __init__(self, id=None, connection=None): self.connection = resolve_connection(connection) self._id = id @@ -240,7 +249,7 @@ class Job(object): # Persistence - def refresh(self): # noqa + def refresh(self, safe=False): # noqa """Overwrite the current instance's properties with the values in the corresponding Redis key. @@ -257,7 +266,12 @@ class Job(object): else: return times.to_universal(date_str) - self._func_name, self._instance, self._args, self._kwargs = unpickle(obj.get('data')) # noqa + self.data = obj.get('data') + try: + self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data) + except UnpickleError: + if not safe: + raise self.created_at = to_date(obj.get('created_at')) self.origin = obj.get('origin') self.description = obj.get('description') diff --git a/rq/queue.py b/rq/queue.py index f130e5b..b962b09 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -73,7 +73,7 @@ class Queue(object): """Returns a list of all (valid) jobs in the queue.""" def safe_fetch(job_id): try: - job = Job.fetch(job_id, connection=self.connection) + job = Job.safe_fetch(job_id, connection=self.connection) except NoSuchJobError: return None except UnpickleError: diff --git a/rq/worker.py b/rq/worker.py index ee3c317..99a3746 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -19,7 +19,7 @@ except ImportError: from logging import Logger from .queue import Queue, get_failed_queue from .connections import get_current_connection -from .job import Status +from .job import Job, Status from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError from .timeouts import death_penalty_after @@ -309,13 +309,8 @@ class Worker(object): except StopRequested: break except UnpickleError as e: - msg = '*** Ignoring unpickleable data on %s.' % \ - green(e.queue.name) - self.log.warning(msg) - self.log.debug('Data follows:') - self.log.debug(e.raw_data) - self.log.debug('End of unreadable data.') - self.failed_queue.push_job_id(e.job_id) + job = Job.safe_fetch(e.job_id) + self.handle_exception(job, *sys.exc_info()) continue self.state = 'busy'