From f500186f3dc652278786a6223e5f24303ad81336 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 23 Nov 2017 12:55:54 +0700 Subject: [PATCH] Job compression (#907) job.exc_info and job.data is now stored in compressed format in Redis. * job.data is now stored in compressed format. --- rq/job.py | 23 ++++++++++++++--- tests/test_job.py | 61 ++++++++++++++++++++++++++++++++++++++------ tests/test_worker.py | 20 +++++++++------ 3 files changed, 84 insertions(+), 20 deletions(-) diff --git a/rq/job.py b/rq/job.py index 83baacb..eb83250 100644 --- a/rq/job.py +++ b/rq/job.py @@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function, import inspect import warnings +import zlib from functools import partial from uuid import uuid4 @@ -416,10 +417,16 @@ class Job(object): return utcparse(as_text(date_str)) try: - self.data = obj['data'] + raw_data = obj['data'] except KeyError: raise NoSuchJobError('Unexpected job format: {0}'.format(obj)) + try: + self.data = zlib.decompress(raw_data) + except zlib.error: + # Fallback to uncompressed string + self.data = raw_data + self.created_at = to_date(as_text(obj.get('created_at'))) self.origin = as_text(obj.get('origin')) self.description = as_text(obj.get('description')) @@ -427,7 +434,6 @@ class Job(object): self.started_at = to_date(as_text(obj.get('started_at'))) self.ended_at = to_date(as_text(obj.get('ended_at'))) self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa - self.exc_info = as_text(obj.get('exc_info')) self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) @@ -435,6 +441,15 @@ class Job(object): self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} + raw_exc_info = obj.get('exc_info') + if raw_exc_info: + try: + self.exc_info = as_text(zlib.decompress(raw_exc_info)) + except zlib.error: + # Fallback to uncompressed string + self.exc_info = as_text(raw_exc_info) + + def to_dict(self, include_meta=True): """ Returns a serialization of the current job instance @@ -444,7 +459,7 @@ class Job(object): """ obj = {} obj['created_at'] = utcformat(self.created_at or utcnow()) - obj['data'] = self.data + obj['data'] = zlib.compress(self.data) if self.origin is not None: obj['origin'] = self.origin @@ -462,7 +477,7 @@ class Job(object): except: obj['result'] = 'Unpickleable return value' if self.exc_info is not None: - obj['exc_info'] = self.exc_info + obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8')) if self.timeout is not None: obj['timeout'] = self.timeout if self.result_ttl is not None: diff --git a/tests/test_job.py b/tests/test_job.py index fe9afcd..b3512a4 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -3,9 +3,10 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from datetime import datetime -import time +import time import sys +import zlib is_py2 = sys.version[0] == '2' if is_py2: @@ -15,7 +16,7 @@ else: from tests import fixtures, RQTestCase -from rq.compat import PY2 +from rq.compat import PY2, as_text from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job from rq.queue import Queue, get_failed_queue @@ -160,7 +161,7 @@ class TestJob(RQTestCase): self.assertEqual(self.testconn.type(job.key), b'hash') # Saving writes pickled job data - unpickled_data = loads(self.testconn.hget(job.key, 'data')) + unpickled_data = loads(zlib.decompress(self.testconn.hget(job.key, 'data'))) self.assertEqual(unpickled_data[0], 'tests.fixtures.some_calculation') def test_fetch(self): @@ -236,7 +237,8 @@ class TestJob(RQTestCase): def test_fetching_unreadable_data(self): """Fetching succeeds on unreadable data, but lazy props fail.""" # Set up - job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2)) + job = Job.create(func=fixtures.some_calculation, args=(3, 4), + kwargs=dict(z=2)) job.save() # Just replace the data hkey with some random noise @@ -255,14 +257,57 @@ class TestJob(RQTestCase): # Now slightly modify the job to make it unimportable (this is # equivalent to a worker not having the most up-to-date source code # and unable to import the function) - data = self.testconn.hget(job.key, 'data') - unimportable_data = data.replace(b'say_hello', b'nay_hello') - self.testconn.hset(job.key, 'data', unimportable_data) + job_data = job.data + unimportable_data = job_data.replace(b'say_hello', b'nay_hello') + + self.testconn.hset(job.key, 'data', zlib.compress(unimportable_data)) job.refresh() with self.assertRaises(AttributeError): job.func # accessing the func property should fail + def test_compressed_exc_info_handling(self): + """Jobs handle both compressed and uncompressed exc_info""" + exception_string = 'Some exception' + + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) + job.exc_info = exception_string + job.save() + + # exc_info is stored in compressed format + exc_info = self.testconn.hget(job.key, 'exc_info') + self.assertEqual( + as_text(zlib.decompress(exc_info)), + exception_string + ) + + job.refresh() + self.assertEqual(job.exc_info, exception_string) + + # Uncompressed exc_info is also handled + self.testconn.hset(job.key, 'exc_info', exception_string) + + job.refresh() + self.assertEqual(job.exc_info, exception_string) + + def test_compressed_job_data_handling(self): + """Jobs handle both compressed and uncompressed data""" + + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) + job.save() + + # Job data is stored in compressed format + job_data = job.data + self.assertEqual( + zlib.compress(job_data), + self.testconn.hget(job.key, 'data') + ) + + self.testconn.hset(job.key, 'data', job_data) + job.refresh() + self.assertEqual(job.data, job_data) + + def test_custom_meta_is_persisted(self): """Additional meta data on jobs are stored persisted correctly.""" job = Job.create(func=fixtures.say_hello, args=('Lionel',)) @@ -457,7 +502,7 @@ class TestJob(RQTestCase): """test if a job created with ttl expires [issue502]""" queue = Queue(connection=self.testconn) queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1) - time.sleep(1) + time.sleep(1.1) self.assertEqual(0, len(queue.get_jobs())) def test_create_and_cancel_job(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 4563e30..8e44d29 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,13 +4,16 @@ from __future__ import (absolute_import, division, print_function, import os import shutil -from datetime import datetime, timedelta -from time import sleep import signal -import time -from multiprocessing import Process import subprocess import sys +import time +import zlib + +from datetime import datetime, timedelta +from multiprocessing import Process +from time import sleep + from unittest import skipIf import pytest @@ -180,10 +183,11 @@ class TestWorker(RQTestCase): # importable from the worker process. job = Job.create(func=div_by_zero, args=(3,)) job.save() - data = self.testconn.hget(job.key, 'data') - invalid_data = data.replace(b'div_by_zero', b'nonexisting') - assert data != invalid_data - self.testconn.hset(job.key, 'data', invalid_data) + + job_data = job.data + invalid_data = job_data.replace(b'div_by_zero', b'nonexisting') + assert job_data != invalid_data + self.testconn.hset(job.key, 'data', zlib.compress(invalid_data)) # We use the low-level internal function to enqueue any data (bypassing # validity checks)