Job compression (#907)

job.exc_info and job.data is now stored in compressed format in Redis.

* job.data is now stored in compressed format.
main
Selwin Ong 7 years ago committed by GitHub
parent 44a0a7b972
commit f500186f3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import inspect
import warnings
import zlib
from functools import partial
from uuid import uuid4
@ -416,10 +417,16 @@ class Job(object):
return utcparse(as_text(date_str))
try:
self.data = obj['data']
raw_data = obj['data']
except KeyError:
raise NoSuchJobError('Unexpected job format: {0}'.format(obj))
try:
self.data = zlib.decompress(raw_data)
except zlib.error:
# Fallback to uncompressed string
self.data = raw_data
self.created_at = to_date(as_text(obj.get('created_at')))
self.origin = as_text(obj.get('origin'))
self.description = as_text(obj.get('description'))
@ -427,7 +434,6 @@ class Job(object):
self.started_at = to_date(as_text(obj.get('started_at')))
self.ended_at = to_date(as_text(obj.get('ended_at')))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.exc_info = as_text(obj.get('exc_info'))
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
@ -435,6 +441,15 @@ class Job(object):
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
raw_exc_info = obj.get('exc_info')
if raw_exc_info:
try:
self.exc_info = as_text(zlib.decompress(raw_exc_info))
except zlib.error:
# Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info)
def to_dict(self, include_meta=True):
"""
Returns a serialization of the current job instance
@ -444,7 +459,7 @@ class Job(object):
"""
obj = {}
obj['created_at'] = utcformat(self.created_at or utcnow())
obj['data'] = self.data
obj['data'] = zlib.compress(self.data)
if self.origin is not None:
obj['origin'] = self.origin
@ -462,7 +477,7 @@ class Job(object):
except:
obj['result'] = 'Unpickleable return value'
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8'))
if self.timeout is not None:
obj['timeout'] = self.timeout
if self.result_ttl is not None:

@ -3,9 +3,10 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
from datetime import datetime
import time
import time
import sys
import zlib
is_py2 = sys.version[0] == '2'
if is_py2:
@ -15,7 +16,7 @@ else:
from tests import fixtures, RQTestCase
from rq.compat import PY2
from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job
from rq.queue import Queue, get_failed_queue
@ -160,7 +161,7 @@ class TestJob(RQTestCase):
self.assertEqual(self.testconn.type(job.key), b'hash')
# Saving writes pickled job data
unpickled_data = loads(self.testconn.hget(job.key, 'data'))
unpickled_data = loads(zlib.decompress(self.testconn.hget(job.key, 'data')))
self.assertEqual(unpickled_data[0], 'tests.fixtures.some_calculation')
def test_fetch(self):
@ -236,7 +237,8 @@ class TestJob(RQTestCase):
def test_fetching_unreadable_data(self):
"""Fetching succeeds on unreadable data, but lazy props fail."""
# Set up
job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
job = Job.create(func=fixtures.some_calculation, args=(3, 4),
kwargs=dict(z=2))
job.save()
# Just replace the data hkey with some random noise
@ -255,14 +257,57 @@ class TestJob(RQTestCase):
# Now slightly modify the job to make it unimportable (this is
# equivalent to a worker not having the most up-to-date source code
# and unable to import the function)
data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace(b'say_hello', b'nay_hello')
self.testconn.hset(job.key, 'data', unimportable_data)
job_data = job.data
unimportable_data = job_data.replace(b'say_hello', b'nay_hello')
self.testconn.hset(job.key, 'data', zlib.compress(unimportable_data))
job.refresh()
with self.assertRaises(AttributeError):
job.func # accessing the func property should fail
def test_compressed_exc_info_handling(self):
"""Jobs handle both compressed and uncompressed exc_info"""
exception_string = 'Some exception'
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.exc_info = exception_string
job.save()
# exc_info is stored in compressed format
exc_info = self.testconn.hget(job.key, 'exc_info')
self.assertEqual(
as_text(zlib.decompress(exc_info)),
exception_string
)
job.refresh()
self.assertEqual(job.exc_info, exception_string)
# Uncompressed exc_info is also handled
self.testconn.hset(job.key, 'exc_info', exception_string)
job.refresh()
self.assertEqual(job.exc_info, exception_string)
def test_compressed_job_data_handling(self):
"""Jobs handle both compressed and uncompressed data"""
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()
# Job data is stored in compressed format
job_data = job.data
self.assertEqual(
zlib.compress(job_data),
self.testconn.hget(job.key, 'data')
)
self.testconn.hset(job.key, 'data', job_data)
job.refresh()
self.assertEqual(job.data, job_data)
def test_custom_meta_is_persisted(self):
"""Additional meta data on jobs are stored persisted correctly."""
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
@ -457,7 +502,7 @@ class TestJob(RQTestCase):
"""test if a job created with ttl expires [issue502]"""
queue = Queue(connection=self.testconn)
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1)
time.sleep(1)
time.sleep(1.1)
self.assertEqual(0, len(queue.get_jobs()))
def test_create_and_cancel_job(self):

@ -4,13 +4,16 @@ from __future__ import (absolute_import, division, print_function,
import os
import shutil
from datetime import datetime, timedelta
from time import sleep
import signal
import time
from multiprocessing import Process
import subprocess
import sys
import time
import zlib
from datetime import datetime, timedelta
from multiprocessing import Process
from time import sleep
from unittest import skipIf
import pytest
@ -180,10 +183,11 @@ class TestWorker(RQTestCase):
# importable from the worker process.
job = Job.create(func=div_by_zero, args=(3,))
job.save()
data = self.testconn.hget(job.key, 'data')
invalid_data = data.replace(b'div_by_zero', b'nonexisting')
assert data != invalid_data
self.testconn.hset(job.key, 'data', invalid_data)
job_data = job.data
invalid_data = job_data.replace(b'div_by_zero', b'nonexisting')
assert job_data != invalid_data
self.testconn.hset(job.key, 'data', zlib.compress(invalid_data))
# We use the low-level internal function to enqueue any data (bypassing
# validity checks)

Loading…
Cancel
Save