From b80045d615d539cf4c565c5541694efe13ea648e Mon Sep 17 00:00:00 2001 From: Josh Cohen Date: Mon, 23 Aug 2021 20:40:29 -0400 Subject: [PATCH] Respect serializer (#1538) * Add serializer where missing in code * Fix cli * Pass option to command * Add tests for serializer option * Merge branch 'master' into respect-serializer - Update enqueue cli to resp. serializer * Address @selwin's review --- rq/cli/cli.py | 13 +++--- rq/command.py | 4 +- rq/job.py | 43 +++++++++++++------- rq/queue.py | 14 ++++--- rq/registry.py | 29 ++++++++----- rq/scheduler.py | 2 +- rq/worker.py | 5 ++- tests/test_cli.py | 65 +++++++++++++++++++++++++++++ tests/test_commands.py | 11 ++--- tests/test_job.py | 51 +++++++++++++---------- tests/test_queue.py | 19 ++++++++- tests/test_registry.py | 92 ++++++++++++++++++++++++++++++++++++++++-- 12 files changed, 276 insertions(+), 72 deletions(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index d1463de..ab16b54 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -126,11 +126,13 @@ def empty(cli_config, all, queues, **options): @click.option('--queue', required=True, type=str) @click.argument('job_ids', nargs=-1) @pass_cli_config -def requeue(cli_config, queue, all, job_class, job_ids, **options): +def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options): """Requeue failed jobs.""" failed_job_registry = FailedJobRegistry(queue, - connection=cli_config.connection) + connection=cli_config.connection, + job_class=job_class, + serializer=serializer) if all: job_ids = failed_job_registry.get_job_ids() @@ -321,7 +323,7 @@ def resume(cli_config, **options): multiple=True) @click.option('--job-id', help='The id of this job') @click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end') -@click.option('--retry-max', help='Maximum amound of retries', default=0, type=int) +@click.option('--retry-max', help='Maximum amount of retries', default=0, type=int) @click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0]) @click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).') @click.option('--schedule-at', help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without ' @@ -331,10 +333,9 @@ def resume(cli_config, **options): @click.argument('arguments', nargs=-1) @pass_cli_config def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, description, depends_on, job_id, at_front, - retry_max, retry_interval, schedule_in, schedule_at, quiet, function, arguments, **options): + retry_max, retry_interval, schedule_in, schedule_at, quiet, serializer, function, arguments, **options): """Enqueues a job from the command line""" args, kwargs = parse_function_args(arguments) - function_string = get_call_string(function, args, kwargs) description = description or function_string @@ -345,7 +346,7 @@ def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, descriptio schedule = parse_schedule(schedule_in, schedule_at) with Connection(cli_config.connection): - queue = cli_config.queue_class(queue) + queue = cli_config.queue_class(queue, serializer=serializer) if schedule is None: job = queue.enqueue_call(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, diff --git a/rq/command.py b/rq/command.py index e8ebcc5..e478cbc 100644 --- a/rq/command.py +++ b/rq/command.py @@ -32,9 +32,9 @@ def send_kill_horse_command(connection, worker_name): send_command(connection, worker_name, 'kill-horse') -def send_stop_job_command(connection, job_id): +def send_stop_job_command(connection, job_id, serializer=None): """Instruct a worker to stop a job""" - job = Job.fetch(job_id, connection=connection) + job = Job.fetch(job_id, connection=connection, serializer=serializer) if not job.worker_name: raise InvalidJobOperation('Job is not currently executing') send_command(connection, job.worker_name, 'stop-job', job_id=job_id) diff --git a/rq/job.py b/rq/job.py index be308ad..ebb87c1 100644 --- a/rq/job.py +++ b/rq/job.py @@ -46,11 +46,11 @@ class JobStatus(str, Enum): UNEVALUATED = object() -def cancel_job(job_id, connection=None): +def cancel_job(job_id, connection=None, serializer=None): """Cancels the job with the given job ID, preventing execution. Discards any job info (i.e. it can't be requeued later). """ - Job.fetch(job_id, connection=connection).cancel() + Job.fetch(job_id, connection=connection, serializer=serializer).cancel() def get_current_job(connection=None, job_class=None): @@ -63,8 +63,8 @@ def get_current_job(connection=None, job_class=None): return _job_stack.top -def requeue_job(job_id, connection): - job = Job.fetch(job_id, connection=connection) +def requeue_job(job_id, connection, serializer=None): + job = Job.fetch(job_id, connection=connection, serializer=serializer) return job.requeue() @@ -689,12 +689,17 @@ class Job: from .registry import CanceledJobRegistry from .queue import Queue - q = Queue(name=self.origin, connection=self.connection) + q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer) q.remove(self, pipeline=pipeline) self.set_status(JobStatus.CANCELED, pipeline=pipeline) - registry = CanceledJobRegistry(self.origin, self.connection, job_class=self.__class__) + registry = CanceledJobRegistry( + self.origin, + self.connection, + job_class=self.__class__, + serializer=self.serializer + ) registry.add(self, pipeline=pipeline) pipeline.execute() @@ -711,35 +716,39 @@ class Job: if remove_from_queue: from .queue import Queue - q = Queue(name=self.origin, connection=self.connection) + q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer) q.remove(self, pipeline=pipeline) if self.is_finished: from .registry import FinishedJobRegistry registry = FinishedJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__) + job_class=self.__class__, + serializer=self.serializer) registry.remove(self, pipeline=pipeline) elif self.is_deferred: from .registry import DeferredJobRegistry registry = DeferredJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__) + job_class=self.__class__, + serializer=self.serializer) registry.remove(self, pipeline=pipeline) elif self.is_started: from .registry import StartedJobRegistry registry = StartedJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__) + job_class=self.__class__, + serializer=self.serializer) registry.remove(self, pipeline=pipeline) elif self.is_scheduled: from .registry import ScheduledJobRegistry registry = ScheduledJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__) + job_class=self.__class__, + serializer=self.serializer) registry.remove(self, pipeline=pipeline) elif self.is_failed: @@ -748,7 +757,8 @@ class Job: elif self.is_canceled: from .registry import CanceledJobRegistry registry = CanceledJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__) + job_class=self.__class__, + serializer=self.serializer) registry.remove(self, pipeline=pipeline) if delete_dependents: @@ -849,13 +859,15 @@ class Job: def started_job_registry(self): from .registry import StartedJobRegistry return StartedJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__) + job_class=self.__class__, + serializer=self.serializer) @property def failed_job_registry(self): from .registry import FailedJobRegistry return FailedJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__) + job_class=self.__class__, + serializer=self.serializer) def get_retry_interval(self): """Returns the desired retry interval. @@ -894,7 +906,8 @@ class Job: registry = DeferredJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__) + job_class=self.__class__, + serializer=self.serializer) registry.add(self, pipeline=pipeline) connection = pipeline if pipeline is not None else self.connection diff --git a/rq/queue.py b/rq/queue.py index febc361..4b477f4 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -229,31 +229,32 @@ class Queue: def failed_job_registry(self): """Returns this queue's FailedJobRegistry.""" from rq.registry import FailedJobRegistry - return FailedJobRegistry(queue=self, job_class=self.job_class) + return FailedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def started_job_registry(self): """Returns this queue's StartedJobRegistry.""" from rq.registry import StartedJobRegistry - return StartedJobRegistry(queue=self, job_class=self.job_class) + return StartedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def finished_job_registry(self): """Returns this queue's FinishedJobRegistry.""" from rq.registry import FinishedJobRegistry - return FinishedJobRegistry(queue=self) + # TODO: Why was job_class only ommited here before? Was it intentional? + return FinishedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def deferred_job_registry(self): """Returns this queue's DeferredJobRegistry.""" from rq.registry import DeferredJobRegistry - return DeferredJobRegistry(queue=self, job_class=self.job_class) + return DeferredJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def scheduled_job_registry(self): """Returns this queue's ScheduledJobRegistry.""" from rq.registry import ScheduledJobRegistry - return ScheduledJobRegistry(queue=self, job_class=self.job_class) + return ScheduledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) def remove(self, job_or_id, pipeline=None): """Removes Job from queue, accepts either a Job instance or ID.""" @@ -609,7 +610,8 @@ nd for dependent in jobs_to_enqueue: registry = DeferredJobRegistry(dependent.origin, self.connection, - job_class=self.job_class) + job_class=self.job_class, + serializer=self.serializer) registry.remove(dependent, pipeline=pipe) if dependent.origin == self.name: self.enqueue_job(dependent, pipeline=pipe) diff --git a/rq/registry.py b/rq/registry.py index 38bdb3c..ac8bca7 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,4 +1,5 @@ import calendar +from rq.serializers import resolve_serializer import time from datetime import datetime, timedelta, timezone @@ -21,13 +22,15 @@ class BaseRegistry: key_template = 'rq:registry:{0}' def __init__(self, name='default', connection=None, job_class=None, - queue=None): + queue=None, serializer=None): if queue: self.name = queue.name self.connection = resolve_connection(queue.connection) + self.serializer = queue.serializer else: self.name = name self.connection = resolve_connection(connection) + self.serializer = resolve_serializer(serializer) self.key = self.key_template.format(self.name) self.job_class = backend_class(self, 'job_class', override=job_class) @@ -77,7 +80,7 @@ class BaseRegistry: if isinstance(job, self.job_class): job_instance = job else: - job_instance = Job.fetch(job_id, connection=connection) + job_instance = Job.fetch(job_id, connection=connection, serializer=self.serializer) job_instance.delete() return result @@ -100,7 +103,7 @@ class BaseRegistry: def get_queue(self): """Returns Queue object associated with this registry.""" - return Queue(self.name, connection=self.connection) + return Queue(self.name, connection=self.connection, serializer=self.serializer) def get_expiration_time(self, job): """Returns job's expiration time.""" @@ -111,8 +114,10 @@ class BaseRegistry: """Requeues the job with the given job ID.""" if isinstance(job_or_id, self.job_class): job = job_or_id + serializer = job.serializer else: - job = self.job_class.fetch(job_or_id, connection=self.connection) + serializer = self.serializer + job = self.job_class.fetch(job_or_id, connection=self.connection, serializer=serializer) result = self.connection.zrem(self.key, job.id) if not result: @@ -120,7 +125,7 @@ class BaseRegistry: with self.connection.pipeline() as pipeline: queue = Queue(job.origin, connection=self.connection, - job_class=self.job_class) + job_class=self.job_class, serializer=serializer) job.started_at = None job.ended_at = None job.exc_info = '' @@ -152,13 +157,14 @@ class StartedJobRegistry(BaseRegistry): job_ids = self.get_expired_job_ids(score) if job_ids: - failed_job_registry = FailedJobRegistry(self.name, self.connection) + failed_job_registry = FailedJobRegistry(self.name, self.connection, serializer=self.serializer) with self.connection.pipeline() as pipeline: for job_id in job_ids: try: job = self.job_class.fetch(job_id, - connection=self.connection) + connection=self.connection, + serializer=self.serializer) except NoSuchJobError: continue @@ -326,14 +332,17 @@ def clean_registries(queue): """Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue.""" registry = FinishedJobRegistry(name=queue.name, connection=queue.connection, - job_class=queue.job_class) + job_class=queue.job_class, + serializer=queue.serializer) registry.cleanup() registry = StartedJobRegistry(name=queue.name, connection=queue.connection, - job_class=queue.job_class) + job_class=queue.job_class, + serializer=queue.serializer) registry.cleanup() registry = FailedJobRegistry(name=queue.name, connection=queue.connection, - job_class=queue.job_class) + job_class=queue.job_class, + serializer=queue.serializer) registry.cleanup() diff --git a/rq/scheduler.py b/rq/scheduler.py index 1b5a8ff..ce5e754 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -129,7 +129,7 @@ class RQScheduler: queue_names = self._acquired_locks for name in queue_names: self._scheduled_job_registries.append( - ScheduledJobRegistry(name, connection=self.connection) + ScheduledJobRegistry(name, connection=self.connection, serializer=self.serializer) ) @classmethod diff --git a/rq/worker.py b/rq/worker.py index a8ca109..46ace25 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -932,7 +932,8 @@ class Worker: started_job_registry = StartedJobRegistry( job.origin, self.connection, - job_class=self.job_class + job_class=self.job_class, + serializer=self.serializer ) job.worker_name = None @@ -953,7 +954,7 @@ class Worker: if not self.disable_default_exception_handler and not retry: failed_job_registry = FailedJobRegistry(job.origin, job.connection, - job_class=self.job_class) + job_class=self.job_class, serializer=job.serializer) failed_job_registry.add(job, ttl=job.failure_ttl, exc_string=exc_string, pipeline=pipeline) diff --git a/tests/test_cli.py b/tests/test_cli.py index e653355..72fc510 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -168,6 +168,45 @@ class TestRQCli(RQTestCase): self.assertNotIn(job2, registry) self.assertNotIn(job3, registry) + def test_requeue_with_serializer(self): + """rq requeue -u -S --all""" + connection = Redis.from_url(self.redis_url) + queue = Queue('requeue', connection=connection, serializer=JSONSerializer) + registry = queue.failed_job_registry + + runner = CliRunner() + + job = queue.enqueue(div_by_zero) + job2 = queue.enqueue(div_by_zero) + job3 = queue.enqueue(div_by_zero) + + worker = Worker([queue], serializer=JSONSerializer) + worker.work(burst=True) + + self.assertIn(job, registry) + self.assertIn(job2, registry) + self.assertIn(job3, registry) + + result = runner.invoke( + main, + ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', job.id] + ) + self.assert_normal_execution(result) + + # Only the first specified job is requeued + self.assertNotIn(job, registry) + self.assertIn(job2, registry) + self.assertIn(job3, registry) + + result = runner.invoke( + main, + ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', '--all'] + ) + self.assert_normal_execution(result) + # With --all flag, all failed jobs are requeued + self.assertNotIn(job2, registry) + self.assertNotIn(job3, registry) + def test_info(self): """rq info -u """ runner = CliRunner() @@ -398,6 +437,32 @@ class TestRQCli(RQTestCase): worker.work(True) self.assertEqual(Job(job_id).result, 'Hi there, Stranger!') + def test_cli_enqueue_with_serializer(self): + """rq enqueue -u -S rq.serializers.JSONSerializer tests.fixtures.say_hello""" + queue = Queue(connection=self.connection, serializer=JSONSerializer) + self.assertTrue(queue.is_empty()) + + runner = CliRunner() + result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '-S', 'rq.serializers.JSONSerializer', 'tests.fixtures.say_hello']) + self.assert_normal_execution(result) + + prefix = 'Enqueued tests.fixtures.say_hello() with job-id \'' + suffix = '\'.\n' + + print(result.stdout) + + self.assertTrue(result.stdout.startswith(prefix)) + self.assertTrue(result.stdout.endswith(suffix)) + + job_id = result.stdout[len(prefix):-len(suffix)] + queue_key = 'rq:queue:default' + self.assertEqual(self.connection.llen(queue_key), 1) + self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id) + + worker = Worker(queue, serializer=JSONSerializer) + worker.work(True) + self.assertEqual(Job(job_id, serializer=JSONSerializer).result, 'Hi there, Stranger!') + def test_cli_enqueue_args(self): """rq enqueue -u tests.fixtures.echo hello ':[1, {"key": "value"}]' json:=["abc"] nojson=def""" queue = Queue(connection=self.connection) diff --git a/tests/test_commands.py b/tests/test_commands.py index f402daa..500906a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,3 +1,4 @@ +from rq.serializers import JSONSerializer import time from multiprocessing import Process @@ -63,17 +64,17 @@ class TestCommands(RQTestCase): """Ensure that stop_job command works properly.""" connection = self.testconn - queue = Queue('foo', connection=connection) + queue = Queue('foo', connection=connection, serializer=JSONSerializer) job = queue.enqueue(long_running_job, 3) - worker = Worker('foo', connection=connection) + worker = Worker('foo', connection=connection, serializer=JSONSerializer) # If job is not executing, an error is raised with self.assertRaises(InvalidJobOperation): - send_stop_job_command(connection, job_id=job.id) + send_stop_job_command(connection, job_id=job.id, serializer=JSONSerializer) # An exception is raised if job ID is invalid with self.assertRaises(NoSuchJobError): - send_stop_job_command(connection, job_id='1') + send_stop_job_command(connection, job_id='1', serializer=JSONSerializer) def start_work(): worker.work(burst=True) @@ -90,7 +91,7 @@ class TestCommands(RQTestCase): worker.refresh() self.assertEqual(worker.get_state(), WorkerStatus.BUSY) - send_stop_job_command(connection, job_id=job.id) + send_stop_job_command(connection, job_id=job.id, serializer=JSONSerializer) time.sleep(0.25) # Job status is set appropriately diff --git a/tests/test_job.py b/tests/test_job.py index e18c07e..cdf0dae 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import json +from rq.serializers import JSONSerializer import time import queue import zlib @@ -596,9 +597,9 @@ class TestJob(RQTestCase): Wthout a save, the dependent job is never saved into redis. The delete method will get and pass a NoSuchJobError. """ - queue = Queue(connection=self.testconn) + queue = Queue(connection=self.testconn, serializer=JSONSerializer) job = queue.enqueue(fixtures.say_hello) - job2 = Job.create(func=fixtures.say_hello, depends_on=job) + job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer) job2.register_dependency() job.delete() @@ -614,49 +615,49 @@ class TestJob(RQTestCase): def test_job_delete_removes_itself_from_registries(self): """job.delete() should remove itself from job registries""" job = Job.create(func=fixtures.say_hello, status=JobStatus.FAILED, - connection=self.testconn, origin='default') + connection=self.testconn, origin='default', serializer=JSONSerializer) job.save() - registry = FailedJobRegistry(connection=self.testconn) + registry = FailedJobRegistry(connection=self.testconn, serializer=JSONSerializer) registry.add(job, 500) job.delete() self.assertFalse(job in registry) job = Job.create(func=fixtures.say_hello, status=JobStatus.FINISHED, - connection=self.testconn, origin='default') + connection=self.testconn, origin='default', serializer=JSONSerializer) job.save() - registry = FinishedJobRegistry(connection=self.testconn) + registry = FinishedJobRegistry(connection=self.testconn, serializer=JSONSerializer) registry.add(job, 500) job.delete() self.assertFalse(job in registry) job = Job.create(func=fixtures.say_hello, status=JobStatus.STARTED, - connection=self.testconn, origin='default') + connection=self.testconn, origin='default', serializer=JSONSerializer) job.save() - registry = StartedJobRegistry(connection=self.testconn) + registry = StartedJobRegistry(connection=self.testconn, serializer=JSONSerializer) registry.add(job, 500) job.delete() self.assertFalse(job in registry) job = Job.create(func=fixtures.say_hello, status=JobStatus.DEFERRED, - connection=self.testconn, origin='default') + connection=self.testconn, origin='default', serializer=JSONSerializer) job.save() - registry = DeferredJobRegistry(connection=self.testconn) + registry = DeferredJobRegistry(connection=self.testconn, serializer=JSONSerializer) registry.add(job, 500) job.delete() self.assertFalse(job in registry) job = Job.create(func=fixtures.say_hello, status=JobStatus.SCHEDULED, - connection=self.testconn, origin='default') + connection=self.testconn, origin='default', serializer=JSONSerializer) job.save() - registry = ScheduledJobRegistry(connection=self.testconn) + registry = ScheduledJobRegistry(connection=self.testconn, serializer=JSONSerializer) registry.add(job, 500) job.delete() @@ -665,9 +666,9 @@ class TestJob(RQTestCase): def test_job_with_dependents_delete_parent_with_saved(self): """job.delete() deletes itself from Redis but not dependents. If the dependent job was saved, it will remain in redis.""" - queue = Queue(connection=self.testconn) + queue = Queue(connection=self.testconn, serializer=JSONSerializer) job = queue.enqueue(fixtures.say_hello) - job2 = Job.create(func=fixtures.say_hello, depends_on=job) + job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer) job2.register_dependency() job2.save() @@ -683,10 +684,10 @@ class TestJob(RQTestCase): def test_job_with_dependents_deleteall(self): """job.delete() deletes itself from Redis. Dependents need to be - deleted explictely.""" - queue = Queue(connection=self.testconn) + deleted explicitly.""" + queue = Queue(connection=self.testconn, serializer=JSONSerializer) job = queue.enqueue(fixtures.say_hello) - job2 = Job.create(func=fixtures.say_hello, depends_on=job) + job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer) job2.register_dependency() job.delete(delete_dependents=True) @@ -701,9 +702,9 @@ class TestJob(RQTestCase): deleted explictely. Without a save, the dependent job is never saved into redis. The delete method will get and pass a NoSuchJobError. """ - queue = Queue(connection=self.testconn) + queue = Queue(connection=self.testconn, serializer=JSONSerializer) job = queue.enqueue(fixtures.say_hello) - job2 = Job.create(func=fixtures.say_hello, depends_on=job) + job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer) job2.register_dependency() job2.save() @@ -729,9 +730,9 @@ class TestJob(RQTestCase): """ job.delete() deletes itself from Redis. """ - queue = Queue(connection=self.testconn) + queue = Queue(connection=self.testconn, serializer=JSONSerializer) dependency_job = queue.enqueue(fixtures.say_hello) - dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job) + dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job, serializer=JSONSerializer) dependent_job.register_dependency() dependent_job.save() @@ -811,6 +812,14 @@ class TestJob(RQTestCase): job.delete() self.assertNotIn(job, registry) + def test_create_and_cancel_job_with_serializer(self): + """test creating and using cancel_job (with serializer) deletes job properly""" + queue = Queue(connection=self.testconn, serializer=JSONSerializer) + job = queue.enqueue(fixtures.say_hello) + self.assertEqual(1, len(queue.get_jobs())) + cancel_job(job.id, serializer=JSONSerializer) + self.assertEqual(0, len(queue.get_jobs())) + def test_dependents_key_for_should_return_prefixed_job_id(self): """test redis key to store job dependents hash under""" job_id = 'random' diff --git a/tests/test_queue.py b/tests/test_queue.py index c03fd09..ce24a96 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function, import json from datetime import datetime, timedelta, timezone +from rq.serializers import DefaultSerializer, JSONSerializer from mock.mock import patch from rq import Retry, Queue @@ -145,7 +146,7 @@ class TestQueue(RQTestCase): def test_remove(self): """Ensure queue.remove properly removes Job from queue.""" - q = Queue('example') + q = Queue('example', serializer=JSONSerializer) job = q.enqueue(say_hello) self.assertIn(job.id, q.job_ids) q.remove(job) @@ -776,6 +777,22 @@ class TestQueue(RQTestCase): self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue)) self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue)) + def test_getting_registries_with_serializer(self): + """Getting job registries from queue object (with custom serializer)""" + queue = Queue('example', serializer=JSONSerializer) + self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue)) + self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue)) + self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue)) + self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue)) + self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue)) + + # Make sure we don't use default when queue has custom + self.assertEqual(queue.scheduled_job_registry.serializer, JSONSerializer) + self.assertEqual(queue.started_job_registry.serializer, JSONSerializer) + self.assertEqual(queue.failed_job_registry.serializer, JSONSerializer) + self.assertEqual(queue.deferred_job_registry.serializer, JSONSerializer) + self.assertEqual(queue.finished_job_registry.serializer, JSONSerializer) + def test_enqueue_with_retry(self): """Enqueueing with retry_strategy works""" queue = Queue('example', connection=self.testconn) diff --git a/tests/test_registry.py b/tests/test_registry.py index 07872b8..6e569db 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from datetime import datetime, timedelta +from rq.serializers import JSONSerializer from rq.compat import as_text from rq.defaults import DEFAULT_FAILURE_TTL @@ -34,10 +35,12 @@ class TestRegistry(RQTestCase): registry = StartedJobRegistry(queue=queue) self.assertEqual(registry.name, queue.name) self.assertEqual(registry.connection, queue.connection) + self.assertEqual(registry.serializer, queue.serializer) - registry = StartedJobRegistry('bar', self.testconn) + registry = StartedJobRegistry('bar', self.testconn, serializer=JSONSerializer) self.assertEqual(registry.name, 'bar') self.assertEqual(registry.connection, self.testconn) + self.assertEqual(registry.serializer, JSONSerializer) def test_key(self): self.assertEqual(self.registry.key, 'rq:wip:default') @@ -114,6 +117,17 @@ class TestRegistry(RQTestCase): self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) self.assertFalse(self.testconn.exists(job.key)) + def test_add_and_remove_with_serializer(self): + """Adding and removing job to StartedJobRegistry (with serializer).""" + # delete_job = True also works with job.id and custom serializer + queue = Queue(connection=self.testconn, serializer=JSONSerializer) + registry = StartedJobRegistry(connection=self.testconn, serializer=JSONSerializer) + job = queue.enqueue(say_hello) + registry.add(job, -1) + registry.remove(job.id, delete_job=True) + self.assertIsNone(self.testconn.zscore(registry.key, job.id)) + self.assertFalse(self.testconn.exists(job.key)) + def test_get_job_ids(self): """Getting job ids from StartedJobRegistry.""" timestamp = current_timestamp() @@ -225,14 +239,33 @@ class TestRegistry(RQTestCase): self.assertEqual(self.testconn.zcard(started_job_registry.key), 0) self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0) + def test_clean_registries_with_serializer(self): + """clean_registries() cleans Started and Finished job registries (with serializer).""" + + queue = Queue(connection=self.testconn, serializer=JSONSerializer) + + finished_job_registry = FinishedJobRegistry(connection=self.testconn, serializer=JSONSerializer) + self.testconn.zadd(finished_job_registry.key, {'foo': 1}) + + started_job_registry = StartedJobRegistry(connection=self.testconn, serializer=JSONSerializer) + self.testconn.zadd(started_job_registry.key, {'foo': 1}) + + failed_job_registry = FailedJobRegistry(connection=self.testconn, serializer=JSONSerializer) + self.testconn.zadd(failed_job_registry.key, {'foo': 1}) + + clean_registries(queue) + self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0) + self.assertEqual(self.testconn.zcard(started_job_registry.key), 0) + self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0) + def test_get_queue(self): """registry.get_queue() returns the right Queue object.""" registry = StartedJobRegistry(connection=self.testconn) self.assertEqual(registry.get_queue(), Queue(connection=self.testconn)) - registry = StartedJobRegistry('foo', connection=self.testconn) + registry = StartedJobRegistry('foo', connection=self.testconn, serializer=JSONSerializer) self.assertEqual(registry.get_queue(), - Queue('foo', connection=self.testconn)) + Queue('foo', connection=self.testconn, serializer=JSONSerializer)) class TestFinishedJobRegistry(RQTestCase): @@ -396,6 +429,59 @@ class TestFailedJobRegistry(RQTestCase): job.refresh() self.assertEqual(job.get_status(), JobStatus.QUEUED) + def test_requeue_with_serializer(self): + """FailedJobRegistry.requeue works properly (with serializer)""" + queue = Queue(connection=self.testconn, serializer=JSONSerializer) + job = queue.enqueue(div_by_zero, failure_ttl=5) + + worker = Worker([queue], serializer=JSONSerializer) + worker.work(burst=True) + + registry = FailedJobRegistry(connection=worker.connection, serializer=JSONSerializer) + self.assertTrue(job in registry) + + registry.requeue(job.id) + self.assertFalse(job in registry) + self.assertIn(job.id, queue.get_job_ids()) + + job.refresh() + self.assertEqual(job.get_status(), JobStatus.QUEUED) + self.assertEqual(job.started_at, None) + self.assertEqual(job.ended_at, None) + + worker.work(burst=True) + self.assertTrue(job in registry) + + # Should also work with job instance + registry.requeue(job) + self.assertFalse(job in registry) + self.assertIn(job.id, queue.get_job_ids()) + + job.refresh() + self.assertEqual(job.get_status(), JobStatus.QUEUED) + + worker.work(burst=True) + self.assertTrue(job in registry) + + # requeue_job should work the same way + requeue_job(job.id, connection=self.testconn, serializer=JSONSerializer) + self.assertFalse(job in registry) + self.assertIn(job.id, queue.get_job_ids()) + + job.refresh() + self.assertEqual(job.get_status(), JobStatus.QUEUED) + + worker.work(burst=True) + self.assertTrue(job in registry) + + # And so does job.requeue() + job.requeue() + self.assertFalse(job in registry) + self.assertIn(job.id, queue.get_job_ids()) + + job.refresh() + self.assertEqual(job.get_status(), JobStatus.QUEUED) + def test_invalid_job(self): """Requeuing a job that's not in FailedJobRegistry raises an error.""" queue = Queue(connection=self.testconn)