Reliable queue (#1911)

* Use lmove() when working on a single queue

* Skip reliable queue tests if Redis server doesn't support LMOVE

* Better test coverage

* job.origin should be string

* Added test for job that gets orphaned if worker.execute_job() fails

* Fix job tests

* worker.run_maintenance_tasks() now cleans intermediate queues

* Fixed import ordering

* No need to run slow tests and flake8 on SSL tests

* Minor typing fixes

* Fixed linting
main
Selwin Ong 2 years ago committed by GitHub
parent 107221fd9e
commit 37ddcb51cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,2 +1,2 @@
redis>=3.5.0
redis>=4.0.0
click>=5.0.0

@ -1,5 +1,5 @@
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Type, Union
if TYPE_CHECKING:
from redis import Redis
@ -21,9 +21,9 @@ class job: # noqa
timeout: Optional[int] = None,
result_ttl: int = DEFAULT_RESULT_TTL,
ttl: Optional[int] = None,
queue_class: Optional['Queue'] = None,
queue_class: Optional[Type['Queue']] = None,
depends_on: Optional[List[Any]] = None,
at_front: Optional[bool] = None,
at_front: bool = False,
meta: Optional[Dict[Any, Any]] = None,
description: Optional[str] = None,
failure_ttl: Optional[int] = None,

@ -153,7 +153,7 @@ class Job:
depends_on: Optional[JobDependencyType] = None,
timeout: Optional[int] = None,
id: Optional[str] = None,
origin=None,
origin: str = '',
meta: Optional[Dict[str, Any]] = None,
failure_ttl: Optional[int] = None,
serializer=None,
@ -221,7 +221,7 @@ class Job:
if id is not None:
job.set_id(id)
if origin is not None:
if origin:
job.origin = origin
# Set the core job tuple properties
@ -504,7 +504,7 @@ class Job:
self._data = UNEVALUATED
@property
def args(self):
def args(self) -> tuple:
if self._args is UNEVALUATED:
self._deserialize_data()
return self._args
@ -608,7 +608,7 @@ class Job:
self._failure_callback_name = None
self._failure_callback = UNEVALUATED
self.description: Optional[str] = None
self.origin: Optional[str] = None
self.origin: str = ''
self.enqueued_at: Optional[datetime] = None
self.started_at: Optional[datetime] = None
self.ended_at: Optional[datetime] = None
@ -623,7 +623,7 @@ class Job:
self.worker_name: Optional[str] = None
self._status = None
self._dependency_ids: List[str] = []
self.meta: Optional[Dict] = {}
self.meta: Dict = {}
self.serializer = resolve_serializer(serializer)
self.retries_left: Optional[int] = None
self.retry_intervals: Optional[List[int]] = None
@ -883,7 +883,7 @@ class Job:
self.data = raw_data
self.created_at = str_to_date(obj.get('created_at'))
self.origin = as_text(obj.get('origin')) if obj.get('origin') else None
self.origin = as_text(obj.get('origin')) if obj.get('origin') else ''
self.worker_name = obj.get('worker_name').decode() if obj.get('worker_name') else None
self.description = as_text(obj.get('description')) if obj.get('description') else None
self.enqueued_at = str_to_date(obj.get('enqueued_at'))
@ -977,7 +977,7 @@ class Job:
obj['retries_left'] = self.retries_left
if self.retry_intervals is not None:
obj['retry_intervals'] = json.dumps(self.retry_intervals)
if self.origin is not None:
if self.origin:
obj['origin'] = self.origin
if self.description is not None:
obj['description'] = self.description

@ -0,0 +1,25 @@
from typing import TYPE_CHECKING
from .queue import Queue
from .utils import as_text
if TYPE_CHECKING:
from .worker import BaseWorker
def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None:
"""
Check whether there are any jobs stuck in the intermediate queue.
A job may be stuck in the intermediate queue if a worker has successfully dequeued a job
but was not able to push it to the StartedJobRegistry. This may happen in rare cases
of hardware or network failure.
We consider a job to be stuck in the intermediate queue if it doesn't exist in the StartedJobRegistry.
"""
job_ids = [as_text(job_id) for job_id in queue.connection.lrange(queue.intermediate_queue_key, 0, -1)]
for job_id in job_ids:
if job_id not in queue.started_job_registry:
job = queue.fetch_job(job_id)
worker.handle_job_failure(job, queue, exc_string='Job was stuck in the intermediate queue.')
queue.connection.lrem(queue.intermediate_queue_key, 1, job_id)

@ -138,6 +138,18 @@ class Queue:
death_penalty_class=death_penalty_class,
)
@classmethod
def get_intermediate_queue_key(cls, key: str) -> str:
"""Returns the intermediate queue key for a given queue key.
Args:
key (str): The queue key
Returns:
str: The intermediate queue key
"""
return f'{key}:intermediate'
def __init__(
self,
name: str = 'default',
@ -209,6 +221,11 @@ class Queue:
"""Returns the Redis key for this Queue."""
return self._key
@property
def intermediate_queue_key(self):
"""Returns the Redis key for intermediate queue."""
return self.get_intermediate_queue_key(self._key)
@property
def registry_cleaning_key(self):
"""Redis key used to indicate this queue has been cleaned."""
@ -221,7 +238,7 @@ class Queue:
pid = self.connection.get(RQScheduler.get_locking_key(self.name))
return int(pid.decode()) if pid is not None else None
def acquire_cleaning_lock(self) -> bool:
def acquire_maintenance_lock(self) -> bool:
"""Returns a boolean indicating whether a lock to clean this queue
is acquired. A lock expires in 899 seconds (15 minutes - 1 second)
@ -290,7 +307,7 @@ class Queue:
return self.count == 0
@property
def is_async(self):
def is_async(self) -> bool:
"""Returns whether the current queue is async."""
return bool(self._is_async)
@ -1175,8 +1192,8 @@ class Queue:
@classmethod
def lpop(cls, queue_keys: List[str], timeout: Optional[int], connection: Optional['Redis'] = None):
"""Helper method. Intermediate method to abstract away from some
Redis API details, where LPOP accepts only a single key, whereas BLPOP
"""Helper method to abstract away from some Redis API details
where LPOP accepts only a single key, whereas BLPOP
accepts multiple. So if we want the non-blocking LPOP, we need to
iterate over all queues, do individual LPOPs, and return the result.
@ -1207,7 +1224,7 @@ class Queue:
logger.debug(f"Starting BLPOP operation for queues {colored_queues} with timeout of {timeout}")
result = connection.blpop(queue_keys, timeout)
if result is None:
logger.debug(f"BLPOP Timeout, no jobs found on queues {colored_queues}")
logger.debug(f"BLPOP timeout, no jobs found on queues {colored_queues}")
raise DequeueTimeout(timeout, queue_keys)
queue_key, job_id = result
return queue_key, job_id
@ -1218,6 +1235,27 @@ class Queue:
return queue_key, blob
return None
@classmethod
def lmove(cls, connection: 'Redis', queue_key: str, timeout: Optional[int]):
"""Similar to lpop, but accepts only a single queue key and immediately pushes
the result to an intermediate queue.
"""
if timeout is not None: # blocking variant
if timeout == 0:
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
colored_queue = green(queue_key)
logger.debug(f"Starting BLMOVE operation for {colored_queue} with timeout of {timeout}")
result = connection.blmove(queue_key, cls.get_intermediate_queue_key(queue_key), timeout)
if result is None:
logger.debug(f"BLMOVE timeout, no jobs found on {colored_queue}")
raise DequeueTimeout(timeout, queue_key)
return queue_key, result
else: # non-blocking variant
result = connection.lmove(queue_key, cls.get_intermediate_queue_key(queue_key))
if result is not None:
return queue_key, result
return None
@classmethod
def dequeue_any(
cls,
@ -1256,6 +1294,9 @@ class Queue:
while True:
queue_keys = [q.key for q in queues]
if len(queue_keys) == 1 and get_version(connection) >= (6, 2, 0):
result = cls.lmove(connection, queue_keys[0], timeout)
else:
result = cls.lpop(queue_keys, timeout, connection=connection)
if result is None:
return None

@ -66,7 +66,7 @@ class BaseRegistry:
and self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs
)
def __contains__(self, item: Union[str, 'Job']):
def __contains__(self, item: Union[str, 'Job']) -> bool:
"""
Returns a boolean indicating registry contains the given
job instance or job id.

@ -48,22 +48,14 @@ from .defaults import (
from .exceptions import DequeueTimeout, DeserializationError, ShutDownImminentException
from .job import Job, JobStatus
from .logutils import blue, green, setup_loghandlers, yellow
from .maintenance import clean_intermediate_queue
from .queue import Queue
from .registry import StartedJobRegistry, clean_registries
from .scheduler import RQScheduler
from .serializers import resolve_serializer
from .suspension import is_suspended
from .timeouts import HorseMonitorTimeoutException, JobTimeoutException, UnixSignalDeathPenalty
from .utils import (
as_text,
backend_class,
compact,
ensure_list,
get_version,
utcformat,
utcnow,
utcparse,
)
from .utils import as_text, backend_class, compact, ensure_list, get_version, utcformat, utcnow, utcparse
from .version import VERSION
try:
@ -294,6 +286,38 @@ class BaseWorker:
return True
return False
def _set_connection(self, connection: Optional['Redis']) -> 'Redis':
"""Configures the Redis connection to have a socket timeout.
This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP).
If the connection provided already has a `socket_timeout` defined, skips.
Args:
connection (Optional[Redis]): The Redis Connection.
"""
if connection is None:
connection = get_current_connection()
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
connection.connection_pool.connection_kwargs.update(timeout_config)
return connection
@property
def dequeue_timeout(self) -> int:
return max(1, self.worker_ttl - 15)
def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries."""
for queue in self.queues:
# If there are multiple workers running, we only want 1 worker
# to run clean_registries().
if queue.acquire_maintenance_lock():
self.log.info('Cleaning registries for queue: %s', queue.name)
clean_registries(queue)
worker_registration.clean_worker_registry(queue)
clean_intermediate_queue(self, queue)
self.last_cleaned_at = utcnow()
def get_redis_server_version(self):
"""Return Redis server version of connection"""
if not self.redis_server_version:
@ -683,22 +707,6 @@ class Worker(BaseWorker):
worker.refresh()
return worker
def _set_connection(self, connection: Optional['Redis']) -> 'Redis':
"""Configures the Redis connection to have a socket timeout.
This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP).
If the connection provided already has a `socket_timeout` defined, skips.
Args:
connection (Optional[Redis]): The Redis Connection.
"""
if connection is None:
connection = get_current_connection()
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
connection.connection_pool.connection_kwargs.update(timeout_config)
return connection
@property
def horse_pid(self):
"""The horse's process ID. Only available in the worker. Will return
@ -711,10 +719,6 @@ class Worker(BaseWorker):
"""Returns whether or not this is the worker or the work horse."""
return self._is_horse
@property
def dequeue_timeout(self) -> int:
return max(1, self.worker_ttl - 15)
@property
def connection_timeout(self) -> int:
return self.dequeue_timeout + 10
@ -1263,7 +1267,7 @@ class Worker(BaseWorker):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
def prepare_job_execution(self, job: 'Job'):
def prepare_job_execution(self, job: 'Job', remove_from_intermediate_queue: bool = False):
"""Performs misc bookkeeping like updating states prior to
job execution.
"""
@ -1277,6 +1281,11 @@ class Worker(BaseWorker):
job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline)
job.prepare_for_execution(self.name, pipeline=pipeline)
if remove_from_intermediate_queue:
from .queue import Queue
queue = Queue(job.origin, connection=self.connection)
pipeline.lrem(queue.intermediate_queue_key, 1, job.id)
pipeline.execute()
self.log.debug('Job preparation finished.')
@ -1407,7 +1416,8 @@ class Worker(BaseWorker):
self.log.debug('Started Job Registry set.')
try:
self.prepare_job_execution(job)
remove_from_intermediate_queue = len(self.queues) == 1
self.prepare_job_execution(job, remove_from_intermediate_queue)
job.started_at = utcnow()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
@ -1524,17 +1534,6 @@ class Worker(BaseWorker):
"""The hash does not take the database/connection into account"""
return hash(self.name)
def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries."""
for queue in self.queues:
# If there are multiple workers running, we only want 1 worker
# to run clean_registries().
if queue.acquire_cleaning_lock():
self.log.info('Cleaning registries for queue: %s', queue.name)
clean_registries(queue)
worker_registration.clean_worker_registry(queue)
self.last_cleaned_at = utcnow()
def handle_payload(self, message):
"""Handle external commands"""
self.log.debug('Received message: %s', message)

@ -49,7 +49,7 @@ class TestJob(RQTestCase):
self.assertEqual(str(job), "<Job %s: test job>" % job.id)
# ...and nothing else
self.assertIsNone(job.origin)
self.assertEqual(job.origin, '')
self.assertIsNone(job.enqueued_at)
self.assertIsNone(job.started_at)
self.assertIsNone(job.ended_at)
@ -87,7 +87,7 @@ class TestJob(RQTestCase):
self.assertEqual(job.kwargs, {'z': 2})
# ...but metadata is not
self.assertIsNone(job.origin)
self.assertEqual(job.origin, '')
self.assertIsNone(job.enqueued_at)
self.assertIsNone(job.result)

@ -0,0 +1,36 @@
import unittest
from unittest.mock import patch
from redis import Redis
from rq.job import JobStatus
from rq.maintenance import clean_intermediate_queue
from rq.queue import Queue
from rq.utils import get_version
from rq.worker import Worker
from tests import RQTestCase
from tests.fixtures import say_hello
class MaintenanceTestCase(RQTestCase):
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
def test_cleanup_intermediate_queue(self):
"""Ensure jobs stuck in the intermediate queue are cleaned up."""
queue = Queue('foo', connection=self.testconn)
job = queue.enqueue(say_hello)
# If job execution fails after it's dequeued, job should be in the intermediate queue
# # and it's status is still QUEUED
with patch.object(Worker, 'execute_job'):
# mocked.execute_job.side_effect = Exception()
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertFalse(job.id in queue.get_job_ids())
self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))
# After cleaning up the intermediate queue, job status should be `FAILED`
# and job is also removed from the intermediate queue
clean_intermediate_queue(worker, queue)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))

@ -1,7 +1,10 @@
import json
import unittest
from datetime import datetime, timedelta, timezone
from unittest.mock import patch
from redis import Redis
from rq import Queue, Retry
from rq.job import Job, JobStatus
from rq.registry import (
@ -13,6 +16,7 @@ from rq.registry import (
StartedJobRegistry,
)
from rq.serializers import JSONSerializer
from rq.utils import get_version
from rq.worker import Worker
from tests import RQTestCase
from tests.fixtures import echo, say_hello
@ -226,8 +230,10 @@ class TestQueue(RQTestCase):
def test_dequeue_any(self):
"""Fetching work from any given queue."""
fooq = Queue('foo')
barq = Queue('bar')
fooq = Queue('foo', connection=self.testconn)
barq = Queue('bar', connection=self.testconn)
self.assertRaises(ValueError, Queue.dequeue_any, [fooq, barq], timeout=0, connection=self.testconn)
self.assertEqual(Queue.dequeue_any([fooq, barq], None), None)
@ -253,6 +259,47 @@ class TestQueue(RQTestCase):
self.assertEqual(job.origin, barq.name)
self.assertEqual(job.args[0], 'for Bar', 'Bar should be dequeued second.')
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
def test_dequeue_any_reliable(self):
"""Dequeueing job from a single queue moves job to intermediate queue."""
foo_queue = Queue('foo', connection=self.testconn)
job_1 = foo_queue.enqueue(say_hello)
self.assertRaises(ValueError, Queue.dequeue_any, [foo_queue], timeout=0, connection=self.testconn)
# Job ID is not in intermediate queue
self.assertIsNone(self.testconn.lpos(foo_queue.intermediate_queue_key, job_1.id))
job, queue = Queue.dequeue_any([foo_queue], timeout=None, connection=self.testconn)
self.assertEqual(queue, foo_queue)
self.assertEqual(job.func, say_hello)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(self.testconn.lpos(foo_queue.intermediate_queue_key, job.id), 0)
# Test the blocking version
foo_queue.enqueue(say_hello)
job, queue = Queue.dequeue_any([foo_queue], timeout=1, connection=self.testconn)
self.assertEqual(queue, foo_queue)
self.assertEqual(job.func, say_hello)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(self.testconn.lpos(foo_queue.intermediate_queue_key, job.id), 1)
@unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
def test_intermediate_queue(self):
"""Job should be stuck in intermediate queue if execution fails after dequeued."""
queue = Queue('foo', connection=self.testconn)
job = queue.enqueue(say_hello)
# If job execution fails after it's dequeued, job should be in the intermediate queue
# # and it's status is still QUEUED
with patch.object(Worker, 'execute_job'):
# mocked.execute_job.side_effect = Exception()
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
# Job status is still QUEUED even though it's already dequeued
self.assertEqual(job.get_status(refresh=True), JobStatus.QUEUED)
self.assertFalse(job.id in queue.get_job_ids())
self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))
def test_dequeue_any_ignores_nonexisting_jobs(self):
"""Dequeuing (from any queue) silently ignores non-existing jobs."""
@ -324,6 +371,7 @@ class TestQueue(RQTestCase):
def test_synchronous_timeout(self):
queue = Queue(is_async=False)
self.assertFalse(queue.is_async)
no_expire_job = queue.enqueue(echo, result_ttl=-1)
self.assertEqual(queue.connection.ttl(no_expire_job.key), -1)
@ -683,7 +731,7 @@ class TestQueue(RQTestCase):
"""Fetch a job from a queue."""
q = Queue('example')
job_orig = q.enqueue(say_hello)
job_fetch = q.fetch_job(job_orig.id)
job_fetch: Job = q.fetch_job(job_orig.id) # type: ignore
self.assertIsNotNone(job_fetch)
self.assertEqual(job_orig.id, job_fetch.id)
self.assertEqual(job_orig.description, job_fetch.description)

@ -15,6 +15,7 @@ from unittest.mock import Mock
import psutil
import pytest
import redis.exceptions
from redis import Redis
from rq import Queue, SimpleWorker, Worker, get_current_connection
from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL
@ -23,7 +24,7 @@ from rq.registry import FailedJobRegistry, FinishedJobRegistry, StartedJobRegist
from rq.results import Result
from rq.serializers import JSONSerializer
from rq.suspension import resume, suspend
from rq.utils import as_text, utcnow
from rq.utils import as_text, get_version, utcnow
from rq.version import VERSION
from rq.worker import HerokuWorker, RandomWorker, RoundRobinWorker, WorkerStatus
from tests import RQTestCase, slow
@ -799,6 +800,28 @@ class TestWorker(RQTestCase):
self.assertEqual(job._status, JobStatus.STARTED)
self.assertEqual(job.worker_name, worker.name)
@skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
def test_prepare_job_execution_removes_key_from_intermediate_queue(self):
"""Prepare job execution removes job from intermediate queue."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
Queue.dequeue_any([queue], timeout=None, connection=self.testconn)
self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))
worker = Worker([queue])
worker.prepare_job_execution(job, remove_from_intermediate_queue=True)
self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))
self.assertEqual(queue.count, 0)
@skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0')
def test_work_removes_key_from_intermediate_queue(self):
"""Worker removes job from intermediate queue."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
worker = Worker([queue])
worker.work(burst=True)
self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key, job.id))
def test_work_unicode_friendly(self):
"""Worker processes work with unicode description, then quits."""
q = Queue('foo')

@ -11,7 +11,6 @@ deps=
psutil
passenv=
RUN_SSL_TESTS
RUN_SLOW_TESTS_TOO
[testenv:lint]
basepython = python3.10

Loading…
Cancel
Save