Added --serializer option to cli, finishing off PR #1381 and fix #1357 (#1395)

* Added --serializer option to cli, finishing off PR #1381 and fix #1357

* Update documentation

* Update documentation

* Modified help message

Co-authored-by: f0cker <dturner@trustwave.com>
main
f0cker 4 years ago committed by GitHub
parent 11c8631921
commit efe703214e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -69,6 +69,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments:
* `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'` * `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'`
* `--disable-job-desc-logging`: Turn off job description logging. * `--disable-job-desc-logging`: Turn off job description logging.
* `--max-jobs`: Maximum number of jobs to execute. * `--max-jobs`: Maximum number of jobs to execute.
* `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer")
## Inside the worker ## Inside the worker
@ -205,23 +206,24 @@ workers = Worker.all(queue=queue)
## Worker with Custom Serializer ## Worker with Custom Serializer
When creating a worker, you can pass in a custom serializer that will be implicitly passed to the queue. When creating a worker, you can pass in a custom serializer that will be implicitly passed to the queue.
Serializers used should have at least `loads` and `dumps` method. Serializers used should have at least `loads` and `dumps` method. An example of creating a custom serializer
class can be found in serializers.py (rq.serializers.JSONSerializer).
The default serializer used is `pickle` The default serializer used is `pickle`
```python ```python
import json
from rq import Worker from rq import Worker
from rq.serialzers import JSONSerializer
job = Worker('foo', serializer=json) job = Worker('foo', serializer=JSONSerializer)
``` ```
or when creating from a queue or when creating from a queue
```python ```python
import json
from rq import Queue, Worker from rq import Queue, Worker
from rq.serialzers import JSONSerializer
w = Worker(Queue('foo'), serializer=json) w = Queue('foo', serializer=JSONSerializer)
``` ```
Queues will now use custom serializer Queues will now use custom serializer

@ -25,11 +25,13 @@ from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries from rq.registry import FailedJobRegistry, clean_registries
from rq.utils import import_attribute from rq.utils import import_attribute
from rq.serializers import DefaultSerializer
from rq.suspension import (suspend as connection_suspend, from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended) resume as connection_resume, is_suspended)
from rq.worker_registration import clean_worker_registry from rq.worker_registration import clean_worker_registry
# Disable the warning that Click displays (as of Click version 5.0) when users # Disable the warning that Click displays (as of Click version 5.0) when users
# use unicode_literals in Python 2. # use unicode_literals in Python 2.
# See http://click.pocoo.org/dev/python3/#unicode-literals for more details. # See http://click.pocoo.org/dev/python3/#unicode-literals for more details.
@ -62,7 +64,10 @@ shared_options = [
click.option('--path', '-P', click.option('--path', '-P',
default='.', default='.',
help='Specify the import path.', help='Specify the import path.',
multiple=True) multiple=True),
click.option('--serializer', '-S',
default=DefaultSerializer,
help='Path to serializer, defaults to rq.serializers.DefaultSerializer')
] ]
@ -117,7 +122,7 @@ def empty(cli_config, all, queues, **options):
@click.option('--queue', required=True, type=str) @click.option('--queue', required=True, type=str)
@click.argument('job_ids', nargs=-1) @click.argument('job_ids', nargs=-1)
@pass_cli_config @pass_cli_config
def requeue(cli_config, queue, all, job_class, job_ids, **options): def requeue(cli_config, queue, all, job_class, job_ids, **options):
"""Requeue failed jobs.""" """Requeue failed jobs."""
failed_job_registry = FailedJobRegistry(queue, failed_job_registry = FailedJobRegistry(queue,
@ -203,13 +208,14 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') @click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') @click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
@pass_cli_config @pass_cli_config
def worker(cli_config, burst, logging_level, name, results_ttl, def worker(cli_config, burst, logging_level, name, results_ttl,
worker_ttl, job_monitoring_interval, disable_job_desc_logging, worker_ttl, job_monitoring_interval, disable_job_desc_logging,
verbose, quiet, sentry_ca_certs, sentry_debug, sentry_dsn, verbose, quiet, sentry_ca_certs, sentry_debug, sentry_dsn,
exception_handler, pid, disable_default_exception_handler, max_jobs, exception_handler, pid, disable_default_exception_handler, max_jobs,
with_scheduler, queues, log_format, date_format, **options): with_scheduler, queues, log_format, date_format, serializer, **options):
"""Starts an RQ worker.""" """Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {} settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments # Worker specific default arguments
@ -226,7 +232,6 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
setup_loghandlers_from_args(verbose, quiet, date_format, log_format) setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
try: try:
cleanup_ghosts(cli_config.connection) cleanup_ghosts(cli_config.connection)
exception_handlers = [] exception_handlers = []
for h in exception_handler: for h in exception_handler:
@ -247,7 +252,8 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
job_class=cli_config.job_class, queue_class=cli_config.queue_class, job_class=cli_config.job_class, queue_class=cli_config.queue_class,
exception_handlers=exception_handlers or None, exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler, disable_default_exception_handler=disable_default_exception_handler,
log_job_description=not disable_job_desc_logging log_job_description=not disable_job_desc_logging,
serializer=serializer
) )
# Should we configure Sentry? # Should we configure Sentry?

@ -12,6 +12,7 @@ from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig from rq.cli.helpers import read_config_file, CliConfig
from rq.job import Job from rq.job import Job
from rq.registry import FailedJobRegistry, ScheduledJobRegistry from rq.registry import FailedJobRegistry, ScheduledJobRegistry
from rq.serializers import JSONSerializer
from rq.worker import Worker, WorkerStatus from rq.worker import Worker, WorkerStatus
import pytest import pytest
@ -346,3 +347,13 @@ class TestRQCli(RQTestCase):
self.assertEqual(result.exit_code, 1) self.assertEqual(result.exit_code, 1)
self.assertIn("Duration must be an integer greater than 1", result.output) self.assertIn("Duration must be an integer greater than 1", result.output)
def test_serializer(self):
"""rq worker -u <url> --serializer <serializer>"""
connection = Redis.from_url(self.redis_url)
q = Queue('default', connection=connection, serializer=JSONSerializer)
runner = CliRunner()
job = q.enqueue(say_hello)
runner.invoke(main, ['worker', '-u', self.redis_url,
'--serializer rq.serializer.JSONSerializer'])
self.assertIn(job.id, q.job_ids)

Loading…
Cancel
Save