diff --git a/docs/docs/workers.md b/docs/docs/workers.md index 9d2f134..ae89b4a 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -69,6 +69,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments: * `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'` * `--disable-job-desc-logging`: Turn off job description logging. * `--max-jobs`: Maximum number of jobs to execute. +* `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer") ## Inside the worker @@ -205,23 +206,24 @@ workers = Worker.all(queue=queue) ## Worker with Custom Serializer When creating a worker, you can pass in a custom serializer that will be implicitly passed to the queue. -Serializers used should have at least `loads` and `dumps` method. +Serializers used should have at least `loads` and `dumps` method. An example of creating a custom serializer +class can be found in serializers.py (rq.serializers.JSONSerializer). The default serializer used is `pickle` ```python -import json from rq import Worker +from rq.serialzers import JSONSerializer -job = Worker('foo', serializer=json) +job = Worker('foo', serializer=JSONSerializer) ``` or when creating from a queue ```python -import json from rq import Queue, Worker +from rq.serialzers import JSONSerializer -w = Worker(Queue('foo'), serializer=json) +w = Queue('foo', serializer=JSONSerializer) ``` Queues will now use custom serializer diff --git a/rq/cli/cli.py b/rq/cli/cli.py index f373b9e..95e8f3f 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -25,11 +25,13 @@ from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, from rq.exceptions import InvalidJobOperationError from rq.registry import FailedJobRegistry, clean_registries from rq.utils import import_attribute +from rq.serializers import DefaultSerializer from rq.suspension import (suspend as connection_suspend, resume as connection_resume, is_suspended) from rq.worker_registration import clean_worker_registry + # Disable the warning that Click displays (as of Click version 5.0) when users # use unicode_literals in Python 2. # See http://click.pocoo.org/dev/python3/#unicode-literals for more details. @@ -62,7 +64,10 @@ shared_options = [ click.option('--path', '-P', default='.', help='Specify the import path.', - multiple=True) + multiple=True), + click.option('--serializer', '-S', + default=DefaultSerializer, + help='Path to serializer, defaults to rq.serializers.DefaultSerializer') ] @@ -117,7 +122,7 @@ def empty(cli_config, all, queues, **options): @click.option('--queue', required=True, type=str) @click.argument('job_ids', nargs=-1) @pass_cli_config -def requeue(cli_config, queue, all, job_class, job_ids, **options): +def requeue(cli_config, queue, all, job_class, job_ids, **options): """Requeue failed jobs.""" failed_job_registry = FailedJobRegistry(queue, @@ -203,13 +208,14 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') @click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') +@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') @click.argument('queues', nargs=-1) @pass_cli_config def worker(cli_config, burst, logging_level, name, results_ttl, worker_ttl, job_monitoring_interval, disable_job_desc_logging, verbose, quiet, sentry_ca_certs, sentry_debug, sentry_dsn, exception_handler, pid, disable_default_exception_handler, max_jobs, - with_scheduler, queues, log_format, date_format, **options): + with_scheduler, queues, log_format, date_format, serializer, **options): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} # Worker specific default arguments @@ -226,7 +232,6 @@ def worker(cli_config, burst, logging_level, name, results_ttl, setup_loghandlers_from_args(verbose, quiet, date_format, log_format) try: - cleanup_ghosts(cli_config.connection) exception_handlers = [] for h in exception_handler: @@ -247,7 +252,8 @@ def worker(cli_config, burst, logging_level, name, results_ttl, job_class=cli_config.job_class, queue_class=cli_config.queue_class, exception_handlers=exception_handlers or None, disable_default_exception_handler=disable_default_exception_handler, - log_job_description=not disable_job_desc_logging + log_job_description=not disable_job_desc_logging, + serializer=serializer ) # Should we configure Sentry? diff --git a/tests/test_cli.py b/tests/test_cli.py index 51d4e60..8c5aa62 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -12,6 +12,7 @@ from rq.cli import main from rq.cli.helpers import read_config_file, CliConfig from rq.job import Job from rq.registry import FailedJobRegistry, ScheduledJobRegistry +from rq.serializers import JSONSerializer from rq.worker import Worker, WorkerStatus import pytest @@ -346,3 +347,13 @@ class TestRQCli(RQTestCase): self.assertEqual(result.exit_code, 1) self.assertIn("Duration must be an integer greater than 1", result.output) + + def test_serializer(self): + """rq worker -u --serializer """ + connection = Redis.from_url(self.redis_url) + q = Queue('default', connection=connection, serializer=JSONSerializer) + runner = CliRunner() + job = q.enqueue(say_hello) + runner.invoke(main, ['worker', '-u', self.redis_url, + '--serializer rq.serializer.JSONSerializer']) + self.assertIn(job.id, q.job_ids)