diff --git a/rq/scheduler.py b/rq/scheduler.py index c22aed7..1b5a8ff 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -7,13 +7,14 @@ from datetime import datetime from enum import Enum from multiprocessing import Process -from redis import Redis, SSLConnection, UnixDomainSocketConnection +from redis import SSLConnection, UnixDomainSocketConnection from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT from .job import Job from .logutils import setup_loghandlers from .queue import Queue from .registry import ScheduledJobRegistry +from .serializers import resolve_serializer from .utils import current_timestamp SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s' @@ -35,7 +36,7 @@ class RQScheduler: def __init__(self, queues, connection, interval=1, logging_level=logging.INFO, date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT): + log_format=DEFAULT_LOGGING_FORMAT, serializer=None): self._queue_names = set(parse_names(queues)) self._acquired_locks = set() self._scheduled_job_registries = [] @@ -60,6 +61,7 @@ class RQScheduler: self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop( 'path' ) + self.serializer = resolve_serializer(serializer) self._connection = None self.interval = interval @@ -152,10 +154,12 @@ class RQScheduler: if not job_ids: continue - queue = Queue(registry.name, connection=self.connection) + queue = Queue(registry.name, connection=self.connection, serializer=self.serializer) with self.connection.pipeline() as pipeline: - jobs = Job.fetch_many(job_ids, connection=self.connection) + jobs = Job.fetch_many( + job_ids, connection=self.connection, serializer=self.serializer + ) for job in jobs: if job is not None: queue.enqueue_job(job, pipeline=pipeline) diff --git a/rq/version.py b/rq/version.py index 14f86eb..2658962 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '1.8.0' +VERSION = '1.8.1' diff --git a/rq/worker.py b/rq/worker.py index d9dd977..3d44ca8 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -557,7 +557,7 @@ class Worker: if with_scheduler: self.scheduler = RQScheduler( self.queues, connection=self.connection, logging_level=logging_level, - date_format=date_format, log_format=log_format) + date_format=date_format, log_format=log_format, serializer=self.serializer) self.scheduler.acquire_locks() # If lock is acquired, start scheduler if self.scheduler.acquired_locks: diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index de248d2..0cdcec4 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -9,6 +9,7 @@ from rq.exceptions import NoSuchJobError from rq.job import Job, Retry from rq.registry import FinishedJobRegistry, ScheduledJobRegistry from rq.scheduler import RQScheduler +from rq.serializers import JSONSerializer from rq.utils import current_timestamp from rq.worker import Worker @@ -309,6 +310,21 @@ class TestWorker(RQTestCase): registry = FinishedJobRegistry(queue=queue) self.assertEqual(len(registry), 1) + def test_work_with_serializer(self): + queue = Queue(connection=self.testconn, serializer=JSONSerializer) + worker = Worker(queues=[queue], connection=self.testconn, serializer=JSONSerializer) + p = Process(target=kill_worker, args=(os.getpid(), False, 5)) + + p.start() + queue.enqueue_at( + datetime(2019, 1, 1, tzinfo=timezone.utc), + say_hello, meta={'foo': 'bar'} + ) + worker.work(burst=False, with_scheduler=True) + p.join(1) + self.assertIsNotNone(worker.scheduler) + registry = FinishedJobRegistry(queue=queue) + self.assertEqual(len(registry), 1) class TestQueue(RQTestCase):