Respect serializer (#1538)

* Add serializer where missing in code

* Fix cli

* Pass option to command

* Add tests for serializer option

* Merge branch 'master' into respect-serializer
- Update enqueue cli to resp. serializer

* Address @selwin's review
main
Josh Cohen 3 years ago committed by GitHub
parent 246d52b977
commit b80045d615
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -126,11 +126,13 @@ def empty(cli_config, all, queues, **options):
@click.option('--queue', required=True, type=str) @click.option('--queue', required=True, type=str)
@click.argument('job_ids', nargs=-1) @click.argument('job_ids', nargs=-1)
@pass_cli_config @pass_cli_config
def requeue(cli_config, queue, all, job_class, job_ids, **options): def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options):
"""Requeue failed jobs.""" """Requeue failed jobs."""
failed_job_registry = FailedJobRegistry(queue, failed_job_registry = FailedJobRegistry(queue,
connection=cli_config.connection) connection=cli_config.connection,
job_class=job_class,
serializer=serializer)
if all: if all:
job_ids = failed_job_registry.get_job_ids() job_ids = failed_job_registry.get_job_ids()
@ -321,7 +323,7 @@ def resume(cli_config, **options):
multiple=True) multiple=True)
@click.option('--job-id', help='The id of this job') @click.option('--job-id', help='The id of this job')
@click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end') @click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end')
@click.option('--retry-max', help='Maximum amound of retries', default=0, type=int) @click.option('--retry-max', help='Maximum amount of retries', default=0, type=int)
@click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0]) @click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0])
@click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).') @click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).')
@click.option('--schedule-at', help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without ' @click.option('--schedule-at', help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without '
@ -331,10 +333,9 @@ def resume(cli_config, **options):
@click.argument('arguments', nargs=-1) @click.argument('arguments', nargs=-1)
@pass_cli_config @pass_cli_config
def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, description, depends_on, job_id, at_front, def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, description, depends_on, job_id, at_front,
retry_max, retry_interval, schedule_in, schedule_at, quiet, function, arguments, **options): retry_max, retry_interval, schedule_in, schedule_at, quiet, serializer, function, arguments, **options):
"""Enqueues a job from the command line""" """Enqueues a job from the command line"""
args, kwargs = parse_function_args(arguments) args, kwargs = parse_function_args(arguments)
function_string = get_call_string(function, args, kwargs) function_string = get_call_string(function, args, kwargs)
description = description or function_string description = description or function_string
@ -345,7 +346,7 @@ def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, descriptio
schedule = parse_schedule(schedule_in, schedule_at) schedule = parse_schedule(schedule_in, schedule_at)
with Connection(cli_config.connection): with Connection(cli_config.connection):
queue = cli_config.queue_class(queue) queue = cli_config.queue_class(queue, serializer=serializer)
if schedule is None: if schedule is None:
job = queue.enqueue_call(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, job = queue.enqueue_call(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl,

@ -32,9 +32,9 @@ def send_kill_horse_command(connection, worker_name):
send_command(connection, worker_name, 'kill-horse') send_command(connection, worker_name, 'kill-horse')
def send_stop_job_command(connection, job_id): def send_stop_job_command(connection, job_id, serializer=None):
"""Instruct a worker to stop a job""" """Instruct a worker to stop a job"""
job = Job.fetch(job_id, connection=connection) job = Job.fetch(job_id, connection=connection, serializer=serializer)
if not job.worker_name: if not job.worker_name:
raise InvalidJobOperation('Job is not currently executing') raise InvalidJobOperation('Job is not currently executing')
send_command(connection, job.worker_name, 'stop-job', job_id=job_id) send_command(connection, job.worker_name, 'stop-job', job_id=job_id)

@ -46,11 +46,11 @@ class JobStatus(str, Enum):
UNEVALUATED = object() UNEVALUATED = object()
def cancel_job(job_id, connection=None): def cancel_job(job_id, connection=None, serializer=None):
"""Cancels the job with the given job ID, preventing execution. Discards """Cancels the job with the given job ID, preventing execution. Discards
any job info (i.e. it can't be requeued later). any job info (i.e. it can't be requeued later).
""" """
Job.fetch(job_id, connection=connection).cancel() Job.fetch(job_id, connection=connection, serializer=serializer).cancel()
def get_current_job(connection=None, job_class=None): def get_current_job(connection=None, job_class=None):
@ -63,8 +63,8 @@ def get_current_job(connection=None, job_class=None):
return _job_stack.top return _job_stack.top
def requeue_job(job_id, connection): def requeue_job(job_id, connection, serializer=None):
job = Job.fetch(job_id, connection=connection) job = Job.fetch(job_id, connection=connection, serializer=serializer)
return job.requeue() return job.requeue()
@ -689,12 +689,17 @@ class Job:
from .registry import CanceledJobRegistry from .registry import CanceledJobRegistry
from .queue import Queue from .queue import Queue
q = Queue(name=self.origin, connection=self.connection) q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer)
q.remove(self, pipeline=pipeline) q.remove(self, pipeline=pipeline)
self.set_status(JobStatus.CANCELED, pipeline=pipeline) self.set_status(JobStatus.CANCELED, pipeline=pipeline)
registry = CanceledJobRegistry(self.origin, self.connection, job_class=self.__class__) registry = CanceledJobRegistry(
self.origin,
self.connection,
job_class=self.__class__,
serializer=self.serializer
)
registry.add(self, pipeline=pipeline) registry.add(self, pipeline=pipeline)
pipeline.execute() pipeline.execute()
@ -711,35 +716,39 @@ class Job:
if remove_from_queue: if remove_from_queue:
from .queue import Queue from .queue import Queue
q = Queue(name=self.origin, connection=self.connection) q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer)
q.remove(self, pipeline=pipeline) q.remove(self, pipeline=pipeline)
if self.is_finished: if self.is_finished:
from .registry import FinishedJobRegistry from .registry import FinishedJobRegistry
registry = FinishedJobRegistry(self.origin, registry = FinishedJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__,
serializer=self.serializer)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_deferred: elif self.is_deferred:
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin, registry = DeferredJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__,
serializer=self.serializer)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_started: elif self.is_started:
from .registry import StartedJobRegistry from .registry import StartedJobRegistry
registry = StartedJobRegistry(self.origin, registry = StartedJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__,
serializer=self.serializer)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_scheduled: elif self.is_scheduled:
from .registry import ScheduledJobRegistry from .registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(self.origin, registry = ScheduledJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__,
serializer=self.serializer)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_failed: elif self.is_failed:
@ -748,7 +757,8 @@ class Job:
elif self.is_canceled: elif self.is_canceled:
from .registry import CanceledJobRegistry from .registry import CanceledJobRegistry
registry = CanceledJobRegistry(self.origin, connection=self.connection, registry = CanceledJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__) job_class=self.__class__,
serializer=self.serializer)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
if delete_dependents: if delete_dependents:
@ -849,13 +859,15 @@ class Job:
def started_job_registry(self): def started_job_registry(self):
from .registry import StartedJobRegistry from .registry import StartedJobRegistry
return StartedJobRegistry(self.origin, connection=self.connection, return StartedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__) job_class=self.__class__,
serializer=self.serializer)
@property @property
def failed_job_registry(self): def failed_job_registry(self):
from .registry import FailedJobRegistry from .registry import FailedJobRegistry
return FailedJobRegistry(self.origin, connection=self.connection, return FailedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__) job_class=self.__class__,
serializer=self.serializer)
def get_retry_interval(self): def get_retry_interval(self):
"""Returns the desired retry interval. """Returns the desired retry interval.
@ -894,7 +906,8 @@ class Job:
registry = DeferredJobRegistry(self.origin, registry = DeferredJobRegistry(self.origin,
connection=self.connection, connection=self.connection,
job_class=self.__class__) job_class=self.__class__,
serializer=self.serializer)
registry.add(self, pipeline=pipeline) registry.add(self, pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection

@ -229,31 +229,32 @@ class Queue:
def failed_job_registry(self): def failed_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's FailedJobRegistry."""
from rq.registry import FailedJobRegistry from rq.registry import FailedJobRegistry
return FailedJobRegistry(queue=self, job_class=self.job_class) return FailedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property @property
def started_job_registry(self): def started_job_registry(self):
"""Returns this queue's StartedJobRegistry.""" """Returns this queue's StartedJobRegistry."""
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
return StartedJobRegistry(queue=self, job_class=self.job_class) return StartedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property @property
def finished_job_registry(self): def finished_job_registry(self):
"""Returns this queue's FinishedJobRegistry.""" """Returns this queue's FinishedJobRegistry."""
from rq.registry import FinishedJobRegistry from rq.registry import FinishedJobRegistry
return FinishedJobRegistry(queue=self) # TODO: Why was job_class only ommited here before? Was it intentional?
return FinishedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property @property
def deferred_job_registry(self): def deferred_job_registry(self):
"""Returns this queue's DeferredJobRegistry.""" """Returns this queue's DeferredJobRegistry."""
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
return DeferredJobRegistry(queue=self, job_class=self.job_class) return DeferredJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property @property
def scheduled_job_registry(self): def scheduled_job_registry(self):
"""Returns this queue's ScheduledJobRegistry.""" """Returns this queue's ScheduledJobRegistry."""
from rq.registry import ScheduledJobRegistry from rq.registry import ScheduledJobRegistry
return ScheduledJobRegistry(queue=self, job_class=self.job_class) return ScheduledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
def remove(self, job_or_id, pipeline=None): def remove(self, job_or_id, pipeline=None):
"""Removes Job from queue, accepts either a Job instance or ID.""" """Removes Job from queue, accepts either a Job instance or ID."""
@ -609,7 +610,8 @@ nd
for dependent in jobs_to_enqueue: for dependent in jobs_to_enqueue:
registry = DeferredJobRegistry(dependent.origin, registry = DeferredJobRegistry(dependent.origin,
self.connection, self.connection,
job_class=self.job_class) job_class=self.job_class,
serializer=self.serializer)
registry.remove(dependent, pipeline=pipe) registry.remove(dependent, pipeline=pipe)
if dependent.origin == self.name: if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipe) self.enqueue_job(dependent, pipeline=pipe)

@ -1,4 +1,5 @@
import calendar import calendar
from rq.serializers import resolve_serializer
import time import time
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
@ -21,13 +22,15 @@ class BaseRegistry:
key_template = 'rq:registry:{0}' key_template = 'rq:registry:{0}'
def __init__(self, name='default', connection=None, job_class=None, def __init__(self, name='default', connection=None, job_class=None,
queue=None): queue=None, serializer=None):
if queue: if queue:
self.name = queue.name self.name = queue.name
self.connection = resolve_connection(queue.connection) self.connection = resolve_connection(queue.connection)
self.serializer = queue.serializer
else: else:
self.name = name self.name = name
self.connection = resolve_connection(connection) self.connection = resolve_connection(connection)
self.serializer = resolve_serializer(serializer)
self.key = self.key_template.format(self.name) self.key = self.key_template.format(self.name)
self.job_class = backend_class(self, 'job_class', override=job_class) self.job_class = backend_class(self, 'job_class', override=job_class)
@ -77,7 +80,7 @@ class BaseRegistry:
if isinstance(job, self.job_class): if isinstance(job, self.job_class):
job_instance = job job_instance = job
else: else:
job_instance = Job.fetch(job_id, connection=connection) job_instance = Job.fetch(job_id, connection=connection, serializer=self.serializer)
job_instance.delete() job_instance.delete()
return result return result
@ -100,7 +103,7 @@ class BaseRegistry:
def get_queue(self): def get_queue(self):
"""Returns Queue object associated with this registry.""" """Returns Queue object associated with this registry."""
return Queue(self.name, connection=self.connection) return Queue(self.name, connection=self.connection, serializer=self.serializer)
def get_expiration_time(self, job): def get_expiration_time(self, job):
"""Returns job's expiration time.""" """Returns job's expiration time."""
@ -111,8 +114,10 @@ class BaseRegistry:
"""Requeues the job with the given job ID.""" """Requeues the job with the given job ID."""
if isinstance(job_or_id, self.job_class): if isinstance(job_or_id, self.job_class):
job = job_or_id job = job_or_id
serializer = job.serializer
else: else:
job = self.job_class.fetch(job_or_id, connection=self.connection) serializer = self.serializer
job = self.job_class.fetch(job_or_id, connection=self.connection, serializer=serializer)
result = self.connection.zrem(self.key, job.id) result = self.connection.zrem(self.key, job.id)
if not result: if not result:
@ -120,7 +125,7 @@ class BaseRegistry:
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
queue = Queue(job.origin, connection=self.connection, queue = Queue(job.origin, connection=self.connection,
job_class=self.job_class) job_class=self.job_class, serializer=serializer)
job.started_at = None job.started_at = None
job.ended_at = None job.ended_at = None
job.exc_info = '' job.exc_info = ''
@ -152,13 +157,14 @@ class StartedJobRegistry(BaseRegistry):
job_ids = self.get_expired_job_ids(score) job_ids = self.get_expired_job_ids(score)
if job_ids: if job_ids:
failed_job_registry = FailedJobRegistry(self.name, self.connection) failed_job_registry = FailedJobRegistry(self.name, self.connection, serializer=self.serializer)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for job_id in job_ids: for job_id in job_ids:
try: try:
job = self.job_class.fetch(job_id, job = self.job_class.fetch(job_id,
connection=self.connection) connection=self.connection,
serializer=self.serializer)
except NoSuchJobError: except NoSuchJobError:
continue continue
@ -326,14 +332,17 @@ def clean_registries(queue):
"""Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue.""" """Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name, registry = FinishedJobRegistry(name=queue.name,
connection=queue.connection, connection=queue.connection,
job_class=queue.job_class) job_class=queue.job_class,
serializer=queue.serializer)
registry.cleanup() registry.cleanup()
registry = StartedJobRegistry(name=queue.name, registry = StartedJobRegistry(name=queue.name,
connection=queue.connection, connection=queue.connection,
job_class=queue.job_class) job_class=queue.job_class,
serializer=queue.serializer)
registry.cleanup() registry.cleanup()
registry = FailedJobRegistry(name=queue.name, registry = FailedJobRegistry(name=queue.name,
connection=queue.connection, connection=queue.connection,
job_class=queue.job_class) job_class=queue.job_class,
serializer=queue.serializer)
registry.cleanup() registry.cleanup()

@ -129,7 +129,7 @@ class RQScheduler:
queue_names = self._acquired_locks queue_names = self._acquired_locks
for name in queue_names: for name in queue_names:
self._scheduled_job_registries.append( self._scheduled_job_registries.append(
ScheduledJobRegistry(name, connection=self.connection) ScheduledJobRegistry(name, connection=self.connection, serializer=self.serializer)
) )
@classmethod @classmethod

@ -932,7 +932,8 @@ class Worker:
started_job_registry = StartedJobRegistry( started_job_registry = StartedJobRegistry(
job.origin, job.origin,
self.connection, self.connection,
job_class=self.job_class job_class=self.job_class,
serializer=self.serializer
) )
job.worker_name = None job.worker_name = None
@ -953,7 +954,7 @@ class Worker:
if not self.disable_default_exception_handler and not retry: if not self.disable_default_exception_handler and not retry:
failed_job_registry = FailedJobRegistry(job.origin, job.connection, failed_job_registry = FailedJobRegistry(job.origin, job.connection,
job_class=self.job_class) job_class=self.job_class, serializer=job.serializer)
failed_job_registry.add(job, ttl=job.failure_ttl, failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline) exc_string=exc_string, pipeline=pipeline)

@ -168,6 +168,45 @@ class TestRQCli(RQTestCase):
self.assertNotIn(job2, registry) self.assertNotIn(job2, registry)
self.assertNotIn(job3, registry) self.assertNotIn(job3, registry)
def test_requeue_with_serializer(self):
"""rq requeue -u <url> -S <serializer> --all"""
connection = Redis.from_url(self.redis_url)
queue = Queue('requeue', connection=connection, serializer=JSONSerializer)
registry = queue.failed_job_registry
runner = CliRunner()
job = queue.enqueue(div_by_zero)
job2 = queue.enqueue(div_by_zero)
job3 = queue.enqueue(div_by_zero)
worker = Worker([queue], serializer=JSONSerializer)
worker.work(burst=True)
self.assertIn(job, registry)
self.assertIn(job2, registry)
self.assertIn(job3, registry)
result = runner.invoke(
main,
['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', job.id]
)
self.assert_normal_execution(result)
# Only the first specified job is requeued
self.assertNotIn(job, registry)
self.assertIn(job2, registry)
self.assertIn(job3, registry)
result = runner.invoke(
main,
['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', '--all']
)
self.assert_normal_execution(result)
# With --all flag, all failed jobs are requeued
self.assertNotIn(job2, registry)
self.assertNotIn(job3, registry)
def test_info(self): def test_info(self):
"""rq info -u <url>""" """rq info -u <url>"""
runner = CliRunner() runner = CliRunner()
@ -398,6 +437,32 @@ class TestRQCli(RQTestCase):
worker.work(True) worker.work(True)
self.assertEqual(Job(job_id).result, 'Hi there, Stranger!') self.assertEqual(Job(job_id).result, 'Hi there, Stranger!')
def test_cli_enqueue_with_serializer(self):
"""rq enqueue -u <url> -S rq.serializers.JSONSerializer tests.fixtures.say_hello"""
queue = Queue(connection=self.connection, serializer=JSONSerializer)
self.assertTrue(queue.is_empty())
runner = CliRunner()
result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '-S', 'rq.serializers.JSONSerializer', 'tests.fixtures.say_hello'])
self.assert_normal_execution(result)
prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
suffix = '\'.\n'
print(result.stdout)
self.assertTrue(result.stdout.startswith(prefix))
self.assertTrue(result.stdout.endswith(suffix))
job_id = result.stdout[len(prefix):-len(suffix)]
queue_key = 'rq:queue:default'
self.assertEqual(self.connection.llen(queue_key), 1)
self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)
worker = Worker(queue, serializer=JSONSerializer)
worker.work(True)
self.assertEqual(Job(job_id, serializer=JSONSerializer).result, 'Hi there, Stranger!')
def test_cli_enqueue_args(self): def test_cli_enqueue_args(self):
"""rq enqueue -u <url> tests.fixtures.echo hello ':[1, {"key": "value"}]' json:=["abc"] nojson=def""" """rq enqueue -u <url> tests.fixtures.echo hello ':[1, {"key": "value"}]' json:=["abc"] nojson=def"""
queue = Queue(connection=self.connection) queue = Queue(connection=self.connection)

@ -1,3 +1,4 @@
from rq.serializers import JSONSerializer
import time import time
from multiprocessing import Process from multiprocessing import Process
@ -63,17 +64,17 @@ class TestCommands(RQTestCase):
"""Ensure that stop_job command works properly.""" """Ensure that stop_job command works properly."""
connection = self.testconn connection = self.testconn
queue = Queue('foo', connection=connection) queue = Queue('foo', connection=connection, serializer=JSONSerializer)
job = queue.enqueue(long_running_job, 3) job = queue.enqueue(long_running_job, 3)
worker = Worker('foo', connection=connection) worker = Worker('foo', connection=connection, serializer=JSONSerializer)
# If job is not executing, an error is raised # If job is not executing, an error is raised
with self.assertRaises(InvalidJobOperation): with self.assertRaises(InvalidJobOperation):
send_stop_job_command(connection, job_id=job.id) send_stop_job_command(connection, job_id=job.id, serializer=JSONSerializer)
# An exception is raised if job ID is invalid # An exception is raised if job ID is invalid
with self.assertRaises(NoSuchJobError): with self.assertRaises(NoSuchJobError):
send_stop_job_command(connection, job_id='1') send_stop_job_command(connection, job_id='1', serializer=JSONSerializer)
def start_work(): def start_work():
worker.work(burst=True) worker.work(burst=True)
@ -90,7 +91,7 @@ class TestCommands(RQTestCase):
worker.refresh() worker.refresh()
self.assertEqual(worker.get_state(), WorkerStatus.BUSY) self.assertEqual(worker.get_state(), WorkerStatus.BUSY)
send_stop_job_command(connection, job_id=job.id) send_stop_job_command(connection, job_id=job.id, serializer=JSONSerializer)
time.sleep(0.25) time.sleep(0.25)
# Job status is set appropriately # Job status is set appropriately

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
from rq.serializers import JSONSerializer
import time import time
import queue import queue
import zlib import zlib
@ -596,9 +597,9 @@ class TestJob(RQTestCase):
Wthout a save, the dependent job is never saved into redis. The delete Wthout a save, the dependent job is never saved into redis. The delete
method will get and pass a NoSuchJobError. method will get and pass a NoSuchJobError.
""" """
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn, serializer=JSONSerializer)
job = queue.enqueue(fixtures.say_hello) job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job) job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer)
job2.register_dependency() job2.register_dependency()
job.delete() job.delete()
@ -614,49 +615,49 @@ class TestJob(RQTestCase):
def test_job_delete_removes_itself_from_registries(self): def test_job_delete_removes_itself_from_registries(self):
"""job.delete() should remove itself from job registries""" """job.delete() should remove itself from job registries"""
job = Job.create(func=fixtures.say_hello, status=JobStatus.FAILED, job = Job.create(func=fixtures.say_hello, status=JobStatus.FAILED,
connection=self.testconn, origin='default') connection=self.testconn, origin='default', serializer=JSONSerializer)
job.save() job.save()
registry = FailedJobRegistry(connection=self.testconn) registry = FailedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
registry.add(job, 500) registry.add(job, 500)
job.delete() job.delete()
self.assertFalse(job in registry) self.assertFalse(job in registry)
job = Job.create(func=fixtures.say_hello, status=JobStatus.FINISHED, job = Job.create(func=fixtures.say_hello, status=JobStatus.FINISHED,
connection=self.testconn, origin='default') connection=self.testconn, origin='default', serializer=JSONSerializer)
job.save() job.save()
registry = FinishedJobRegistry(connection=self.testconn) registry = FinishedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
registry.add(job, 500) registry.add(job, 500)
job.delete() job.delete()
self.assertFalse(job in registry) self.assertFalse(job in registry)
job = Job.create(func=fixtures.say_hello, status=JobStatus.STARTED, job = Job.create(func=fixtures.say_hello, status=JobStatus.STARTED,
connection=self.testconn, origin='default') connection=self.testconn, origin='default', serializer=JSONSerializer)
job.save() job.save()
registry = StartedJobRegistry(connection=self.testconn) registry = StartedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
registry.add(job, 500) registry.add(job, 500)
job.delete() job.delete()
self.assertFalse(job in registry) self.assertFalse(job in registry)
job = Job.create(func=fixtures.say_hello, status=JobStatus.DEFERRED, job = Job.create(func=fixtures.say_hello, status=JobStatus.DEFERRED,
connection=self.testconn, origin='default') connection=self.testconn, origin='default', serializer=JSONSerializer)
job.save() job.save()
registry = DeferredJobRegistry(connection=self.testconn) registry = DeferredJobRegistry(connection=self.testconn, serializer=JSONSerializer)
registry.add(job, 500) registry.add(job, 500)
job.delete() job.delete()
self.assertFalse(job in registry) self.assertFalse(job in registry)
job = Job.create(func=fixtures.say_hello, status=JobStatus.SCHEDULED, job = Job.create(func=fixtures.say_hello, status=JobStatus.SCHEDULED,
connection=self.testconn, origin='default') connection=self.testconn, origin='default', serializer=JSONSerializer)
job.save() job.save()
registry = ScheduledJobRegistry(connection=self.testconn) registry = ScheduledJobRegistry(connection=self.testconn, serializer=JSONSerializer)
registry.add(job, 500) registry.add(job, 500)
job.delete() job.delete()
@ -665,9 +666,9 @@ class TestJob(RQTestCase):
def test_job_with_dependents_delete_parent_with_saved(self): def test_job_with_dependents_delete_parent_with_saved(self):
"""job.delete() deletes itself from Redis but not dependents. If the """job.delete() deletes itself from Redis but not dependents. If the
dependent job was saved, it will remain in redis.""" dependent job was saved, it will remain in redis."""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn, serializer=JSONSerializer)
job = queue.enqueue(fixtures.say_hello) job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job) job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer)
job2.register_dependency() job2.register_dependency()
job2.save() job2.save()
@ -683,10 +684,10 @@ class TestJob(RQTestCase):
def test_job_with_dependents_deleteall(self): def test_job_with_dependents_deleteall(self):
"""job.delete() deletes itself from Redis. Dependents need to be """job.delete() deletes itself from Redis. Dependents need to be
deleted explictely.""" deleted explicitly."""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn, serializer=JSONSerializer)
job = queue.enqueue(fixtures.say_hello) job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job) job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer)
job2.register_dependency() job2.register_dependency()
job.delete(delete_dependents=True) job.delete(delete_dependents=True)
@ -701,9 +702,9 @@ class TestJob(RQTestCase):
deleted explictely. Without a save, the dependent job is never saved deleted explictely. Without a save, the dependent job is never saved
into redis. The delete method will get and pass a NoSuchJobError. into redis. The delete method will get and pass a NoSuchJobError.
""" """
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn, serializer=JSONSerializer)
job = queue.enqueue(fixtures.say_hello) job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job) job2 = Job.create(func=fixtures.say_hello, depends_on=job, serializer=JSONSerializer)
job2.register_dependency() job2.register_dependency()
job2.save() job2.save()
@ -729,9 +730,9 @@ class TestJob(RQTestCase):
""" """
job.delete() deletes itself from Redis. job.delete() deletes itself from Redis.
""" """
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn, serializer=JSONSerializer)
dependency_job = queue.enqueue(fixtures.say_hello) dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job) dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job, serializer=JSONSerializer)
dependent_job.register_dependency() dependent_job.register_dependency()
dependent_job.save() dependent_job.save()
@ -811,6 +812,14 @@ class TestJob(RQTestCase):
job.delete() job.delete()
self.assertNotIn(job, registry) self.assertNotIn(job, registry)
def test_create_and_cancel_job_with_serializer(self):
"""test creating and using cancel_job (with serializer) deletes job properly"""
queue = Queue(connection=self.testconn, serializer=JSONSerializer)
job = queue.enqueue(fixtures.say_hello)
self.assertEqual(1, len(queue.get_jobs()))
cancel_job(job.id, serializer=JSONSerializer)
self.assertEqual(0, len(queue.get_jobs()))
def test_dependents_key_for_should_return_prefixed_job_id(self): def test_dependents_key_for_should_return_prefixed_job_id(self):
"""test redis key to store job dependents hash under""" """test redis key to store job dependents hash under"""
job_id = 'random' job_id = 'random'

@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import json import json
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from rq.serializers import DefaultSerializer, JSONSerializer
from mock.mock import patch from mock.mock import patch
from rq import Retry, Queue from rq import Retry, Queue
@ -145,7 +146,7 @@ class TestQueue(RQTestCase):
def test_remove(self): def test_remove(self):
"""Ensure queue.remove properly removes Job from queue.""" """Ensure queue.remove properly removes Job from queue."""
q = Queue('example') q = Queue('example', serializer=JSONSerializer)
job = q.enqueue(say_hello) job = q.enqueue(say_hello)
self.assertIn(job.id, q.job_ids) self.assertIn(job.id, q.job_ids)
q.remove(job) q.remove(job)
@ -776,6 +777,22 @@ class TestQueue(RQTestCase):
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue)) self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue)) self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
def test_getting_registries_with_serializer(self):
"""Getting job registries from queue object (with custom serializer)"""
queue = Queue('example', serializer=JSONSerializer)
self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue))
self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue))
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
# Make sure we don't use default when queue has custom
self.assertEqual(queue.scheduled_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.started_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.failed_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.deferred_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.finished_job_registry.serializer, JSONSerializer)
def test_enqueue_with_retry(self): def test_enqueue_with_retry(self):
"""Enqueueing with retry_strategy works""" """Enqueueing with retry_strategy works"""
queue = Queue('example', connection=self.testconn) queue = Queue('example', connection=self.testconn)

@ -2,6 +2,7 @@
from __future__ import absolute_import from __future__ import absolute_import
from datetime import datetime, timedelta from datetime import datetime, timedelta
from rq.serializers import JSONSerializer
from rq.compat import as_text from rq.compat import as_text
from rq.defaults import DEFAULT_FAILURE_TTL from rq.defaults import DEFAULT_FAILURE_TTL
@ -34,10 +35,12 @@ class TestRegistry(RQTestCase):
registry = StartedJobRegistry(queue=queue) registry = StartedJobRegistry(queue=queue)
self.assertEqual(registry.name, queue.name) self.assertEqual(registry.name, queue.name)
self.assertEqual(registry.connection, queue.connection) self.assertEqual(registry.connection, queue.connection)
self.assertEqual(registry.serializer, queue.serializer)
registry = StartedJobRegistry('bar', self.testconn) registry = StartedJobRegistry('bar', self.testconn, serializer=JSONSerializer)
self.assertEqual(registry.name, 'bar') self.assertEqual(registry.name, 'bar')
self.assertEqual(registry.connection, self.testconn) self.assertEqual(registry.connection, self.testconn)
self.assertEqual(registry.serializer, JSONSerializer)
def test_key(self): def test_key(self):
self.assertEqual(self.registry.key, 'rq:wip:default') self.assertEqual(self.registry.key, 'rq:wip:default')
@ -114,6 +117,17 @@ class TestRegistry(RQTestCase):
self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) self.assertIsNone(self.testconn.zscore(self.registry.key, job.id))
self.assertFalse(self.testconn.exists(job.key)) self.assertFalse(self.testconn.exists(job.key))
def test_add_and_remove_with_serializer(self):
"""Adding and removing job to StartedJobRegistry (with serializer)."""
# delete_job = True also works with job.id and custom serializer
queue = Queue(connection=self.testconn, serializer=JSONSerializer)
registry = StartedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
job = queue.enqueue(say_hello)
registry.add(job, -1)
registry.remove(job.id, delete_job=True)
self.assertIsNone(self.testconn.zscore(registry.key, job.id))
self.assertFalse(self.testconn.exists(job.key))
def test_get_job_ids(self): def test_get_job_ids(self):
"""Getting job ids from StartedJobRegistry.""" """Getting job ids from StartedJobRegistry."""
timestamp = current_timestamp() timestamp = current_timestamp()
@ -225,14 +239,33 @@ class TestRegistry(RQTestCase):
self.assertEqual(self.testconn.zcard(started_job_registry.key), 0) self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0) self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0)
def test_clean_registries_with_serializer(self):
"""clean_registries() cleans Started and Finished job registries (with serializer)."""
queue = Queue(connection=self.testconn, serializer=JSONSerializer)
finished_job_registry = FinishedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
self.testconn.zadd(finished_job_registry.key, {'foo': 1})
started_job_registry = StartedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
self.testconn.zadd(started_job_registry.key, {'foo': 1})
failed_job_registry = FailedJobRegistry(connection=self.testconn, serializer=JSONSerializer)
self.testconn.zadd(failed_job_registry.key, {'foo': 1})
clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0)
def test_get_queue(self): def test_get_queue(self):
"""registry.get_queue() returns the right Queue object.""" """registry.get_queue() returns the right Queue object."""
registry = StartedJobRegistry(connection=self.testconn) registry = StartedJobRegistry(connection=self.testconn)
self.assertEqual(registry.get_queue(), Queue(connection=self.testconn)) self.assertEqual(registry.get_queue(), Queue(connection=self.testconn))
registry = StartedJobRegistry('foo', connection=self.testconn) registry = StartedJobRegistry('foo', connection=self.testconn, serializer=JSONSerializer)
self.assertEqual(registry.get_queue(), self.assertEqual(registry.get_queue(),
Queue('foo', connection=self.testconn)) Queue('foo', connection=self.testconn, serializer=JSONSerializer))
class TestFinishedJobRegistry(RQTestCase): class TestFinishedJobRegistry(RQTestCase):
@ -396,6 +429,59 @@ class TestFailedJobRegistry(RQTestCase):
job.refresh() job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED) self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_requeue_with_serializer(self):
"""FailedJobRegistry.requeue works properly (with serializer)"""
queue = Queue(connection=self.testconn, serializer=JSONSerializer)
job = queue.enqueue(div_by_zero, failure_ttl=5)
worker = Worker([queue], serializer=JSONSerializer)
worker.work(burst=True)
registry = FailedJobRegistry(connection=worker.connection, serializer=JSONSerializer)
self.assertTrue(job in registry)
registry.requeue(job.id)
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(job.started_at, None)
self.assertEqual(job.ended_at, None)
worker.work(burst=True)
self.assertTrue(job in registry)
# Should also work with job instance
registry.requeue(job)
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED)
worker.work(burst=True)
self.assertTrue(job in registry)
# requeue_job should work the same way
requeue_job(job.id, connection=self.testconn, serializer=JSONSerializer)
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED)
worker.work(burst=True)
self.assertTrue(job in registry)
# And so does job.requeue()
job.requeue()
self.assertFalse(job in registry)
self.assertIn(job.id, queue.get_job_ids())
job.refresh()
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_invalid_job(self): def test_invalid_job(self):
"""Requeuing a job that's not in FailedJobRegistry raises an error.""" """Requeuing a job that's not in FailedJobRegistry raises an error."""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)

Loading…
Cancel
Save