Fix custom serializer in job fetches (#1381)

* Ensure that the custom serializer defined is passed into the job fetch calls

* add serializer as argument to fetch_many and dequeue_any methods

* add worker test for custom serializer

* move json serializer to serializers.py
main
JackBoreczky 4 years ago committed by GitHub
parent 8b9e2188e4
commit 016da14723
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -190,7 +190,7 @@ class Job(object):
return None return None
if hasattr(self, '_dependency'): if hasattr(self, '_dependency'):
return self._dependency return self._dependency
job = self.fetch(self._dependency_ids[0], connection=self.connection) job = self.fetch(self._dependency_ids[0], connection=self.connection, serializer=self.serializer)
self._dependency = job self._dependency = job
return job return job
@ -301,7 +301,7 @@ class Job(object):
return job return job
@classmethod @classmethod
def fetch_many(cls, job_ids, connection): def fetch_many(cls, job_ids, connection, serializer=None):
""" """
Bulk version of Job.fetch Bulk version of Job.fetch
@ -316,7 +316,7 @@ class Job(object):
jobs = [] jobs = []
for i, job_id in enumerate(job_ids): for i, job_id in enumerate(job_ids):
if results[i]: if results[i]:
job = cls(job_id, connection=connection) job = cls(job_id, connection=connection, serializer=serializer)
job.restore(results[i]) job.restore(results[i])
jobs.append(job) jobs.append(job)
else: else:
@ -679,7 +679,7 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
for dependent_id in self.dependent_ids: for dependent_id in self.dependent_ids:
try: try:
job = Job.fetch(dependent_id, connection=self.connection) job = Job.fetch(dependent_id, connection=self.connection, serializer=self.serializer)
job.delete(pipeline=pipeline, job.delete(pipeline=pipeline,
remove_from_queue=False) remove_from_queue=False)
except NoSuchJobError: except NoSuchJobError:

@ -155,7 +155,7 @@ class Queue(object):
def fetch_job(self, job_id): def fetch_job(self, job_id):
try: try:
job = self.job_class.fetch(job_id, connection=self.connection) job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer)
except NoSuchJobError: except NoSuchJobError:
self.remove(job_id) self.remove(job_id)
else: else:
@ -511,7 +511,8 @@ nd
dependent_job for dependent_job dependent_job for dependent_job
in self.job_class.fetch_many( in self.job_class.fetch_many(
dependent_job_ids, dependent_job_ids,
connection=self.connection connection=self.connection,
serializer=self.serializer
) if dependent_job.dependencies_are_met( ) if dependent_job.dependencies_are_met(
exclude_job_id=job.id, exclude_job_id=job.id,
pipeline=pipe pipeline=pipe
@ -581,7 +582,7 @@ nd
return None return None
@classmethod @classmethod
def dequeue_any(cls, queues, timeout, connection=None, job_class=None): def dequeue_any(cls, queues, timeout, connection=None, job_class=None, serializer=None):
"""Class method returning the job_class instance at the front of the given """Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important. set of Queues, where the order of the queues is important.
@ -602,9 +603,10 @@ nd
queue_key, job_id = map(as_text, result) queue_key, job_id = map(as_text, result)
queue = cls.from_queue_key(queue_key, queue = cls.from_queue_key(queue_key,
connection=connection, connection=connection,
job_class=job_class) job_class=job_class,
serializer=serializer)
try: try:
job = job_class.fetch(job_id, connection=connection) job = job_class.fetch(job_id, connection=connection, serializer=serializer)
except NoSuchJobError: except NoSuchJobError:
# Silently pass on jobs that don't exist (anymore), # Silently pass on jobs that don't exist (anymore),
# and continue in the look # and continue in the look

@ -1,5 +1,6 @@
from functools import partial from functools import partial
import pickle import pickle
import json
from .compat import string_types from .compat import string_types
from .utils import import_attribute from .utils import import_attribute
@ -10,6 +11,16 @@ class DefaultSerializer:
loads = pickle.loads loads = pickle.loads
class JSONSerializer():
@staticmethod
def dumps(*args, **kwargs):
return json.dumps(*args, **kwargs).encode('utf-8')
@staticmethod
def loads(s, *args, **kwargs):
return json.loads(s.decode('utf-8'), *args, **kwargs)
def resolve_serializer(serializer): def resolve_serializer(serializer):
"""This function checks the user defined serializer for ('dumps', 'loads') methods """This function checks the user defined serializer for ('dumps', 'loads') methods
It returns a default pickle serializer if not found else it returns a MySerializer It returns a default pickle serializer if not found else it returns a MySerializer

@ -387,7 +387,7 @@ class Worker(object):
if job_id is None: if job_id is None:
return None return None
return self.job_class.fetch(job_id, self.connection) return self.job_class.fetch(job_id, self.connection, self.serializer)
def _install_signal_handlers(self): def _install_signal_handlers(self):
"""Installs signal handlers for handling SIGINT and SIGTERM """Installs signal handlers for handling SIGINT and SIGTERM
@ -638,7 +638,8 @@ class Worker(object):
try: try:
result = self.queue_class.dequeue_any(self.queues, timeout, result = self.queue_class.dequeue_any(self.queues, timeout,
connection=self.connection, connection=self.connection,
job_class=self.job_class) job_class=self.job_class,
serializer=self.serializer)
if result is not None: if result is not None:
job, queue = result job, queue = result

@ -38,6 +38,7 @@ from rq.suspension import resume, suspend
from rq.utils import utcnow from rq.utils import utcnow
from rq.version import VERSION from rq.version import VERSION
from rq.worker import HerokuWorker, WorkerStatus from rq.worker import HerokuWorker, WorkerStatus
from rq.serializers import JSONSerializer
class CustomJob(Job): class CustomJob(Job):
pass pass
@ -122,6 +123,21 @@ class TestWorker(RQTestCase):
'Expected at least some work done.' 'Expected at least some work done.'
) )
def test_work_and_quit_custom_serializer(self):
"""Worker processes work, then quits."""
fooq, barq = Queue('foo', serializer=JSONSerializer), Queue('bar', serializer=JSONSerializer)
w = Worker([fooq, barq], serializer=JSONSerializer)
self.assertEqual(
w.work(burst=True), False,
'Did not expect any work on the queue.'
)
fooq.enqueue(say_hello, name='Frank')
self.assertEqual(
w.work(burst=True), True,
'Expected at least some work done.'
)
def test_worker_all(self): def test_worker_all(self):
"""Worker.all() works properly""" """Worker.all() works properly"""
foo_queue = Queue('foo') foo_queue = Queue('foo')

Loading…
Cancel
Save