Make RQScheduler work with a serializer (#1455)

* Scheduler with a serializer

* test with metadata

Co-authored-by: alella <ashoka.lella@factset.com>
main
Ashoka Lella 4 years ago committed by GitHub
parent 428d75b9b6
commit d333d20914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,13 +7,14 @@ from datetime import datetime
from enum import Enum from enum import Enum
from multiprocessing import Process from multiprocessing import Process
from redis import Redis, SSLConnection, UnixDomainSocketConnection from redis import SSLConnection, UnixDomainSocketConnection
from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT
from .job import Job from .job import Job
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
from .queue import Queue from .queue import Queue
from .registry import ScheduledJobRegistry from .registry import ScheduledJobRegistry
from .serializers import resolve_serializer
from .utils import current_timestamp from .utils import current_timestamp
SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s' SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s'
@ -35,7 +36,7 @@ class RQScheduler:
def __init__(self, queues, connection, interval=1, logging_level=logging.INFO, def __init__(self, queues, connection, interval=1, logging_level=logging.INFO,
date_format=DEFAULT_LOGGING_DATE_FORMAT, date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT): log_format=DEFAULT_LOGGING_FORMAT, serializer=None):
self._queue_names = set(parse_names(queues)) self._queue_names = set(parse_names(queues))
self._acquired_locks = set() self._acquired_locks = set()
self._scheduled_job_registries = [] self._scheduled_job_registries = []
@ -60,6 +61,7 @@ class RQScheduler:
self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop( self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop(
'path' 'path'
) )
self.serializer = resolve_serializer(serializer)
self._connection = None self._connection = None
self.interval = interval self.interval = interval
@ -152,10 +154,12 @@ class RQScheduler:
if not job_ids: if not job_ids:
continue continue
queue = Queue(registry.name, connection=self.connection) queue = Queue(registry.name, connection=self.connection, serializer=self.serializer)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
jobs = Job.fetch_many(job_ids, connection=self.connection) jobs = Job.fetch_many(
job_ids, connection=self.connection, serializer=self.serializer
)
for job in jobs: for job in jobs:
if job is not None: if job is not None:
queue.enqueue_job(job, pipeline=pipeline) queue.enqueue_job(job, pipeline=pipeline)

@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
VERSION = '1.8.0' VERSION = '1.8.1'

@ -557,7 +557,7 @@ class Worker:
if with_scheduler: if with_scheduler:
self.scheduler = RQScheduler( self.scheduler = RQScheduler(
self.queues, connection=self.connection, logging_level=logging_level, self.queues, connection=self.connection, logging_level=logging_level,
date_format=date_format, log_format=log_format) date_format=date_format, log_format=log_format, serializer=self.serializer)
self.scheduler.acquire_locks() self.scheduler.acquire_locks()
# If lock is acquired, start scheduler # If lock is acquired, start scheduler
if self.scheduler.acquired_locks: if self.scheduler.acquired_locks:

@ -9,6 +9,7 @@ from rq.exceptions import NoSuchJobError
from rq.job import Job, Retry from rq.job import Job, Retry
from rq.registry import FinishedJobRegistry, ScheduledJobRegistry from rq.registry import FinishedJobRegistry, ScheduledJobRegistry
from rq.scheduler import RQScheduler from rq.scheduler import RQScheduler
from rq.serializers import JSONSerializer
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker from rq.worker import Worker
@ -309,6 +310,21 @@ class TestWorker(RQTestCase):
registry = FinishedJobRegistry(queue=queue) registry = FinishedJobRegistry(queue=queue)
self.assertEqual(len(registry), 1) self.assertEqual(len(registry), 1)
def test_work_with_serializer(self):
queue = Queue(connection=self.testconn, serializer=JSONSerializer)
worker = Worker(queues=[queue], connection=self.testconn, serializer=JSONSerializer)
p = Process(target=kill_worker, args=(os.getpid(), False, 5))
p.start()
queue.enqueue_at(
datetime(2019, 1, 1, tzinfo=timezone.utc),
say_hello, meta={'foo': 'bar'}
)
worker.work(burst=False, with_scheduler=True)
p.join(1)
self.assertIsNotNone(worker.scheduler)
registry = FinishedJobRegistry(queue=queue)
self.assertEqual(len(registry), 1)
class TestQueue(RQTestCase): class TestQueue(RQTestCase):

Loading…
Cancel
Save