diff --git a/requirements.txt b/requirements.txt index f499594..3a14dff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -redis>=3.5.0 +redis>=4.0.0 click>=5.0.0 diff --git a/rq/decorators.py b/rq/decorators.py index a24101e..41b2da4 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,5 +1,5 @@ from functools import wraps -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Type, Union if TYPE_CHECKING: from redis import Redis @@ -21,9 +21,9 @@ class job: # noqa timeout: Optional[int] = None, result_ttl: int = DEFAULT_RESULT_TTL, ttl: Optional[int] = None, - queue_class: Optional['Queue'] = None, + queue_class: Optional[Type['Queue']] = None, depends_on: Optional[List[Any]] = None, - at_front: Optional[bool] = None, + at_front: bool = False, meta: Optional[Dict[Any, Any]] = None, description: Optional[str] = None, failure_ttl: Optional[int] = None, diff --git a/rq/job.py b/rq/job.py index 7e7e964..17a8079 100644 --- a/rq/job.py +++ b/rq/job.py @@ -153,7 +153,7 @@ class Job: depends_on: Optional[JobDependencyType] = None, timeout: Optional[int] = None, id: Optional[str] = None, - origin=None, + origin: str = '', meta: Optional[Dict[str, Any]] = None, failure_ttl: Optional[int] = None, serializer=None, @@ -221,7 +221,7 @@ class Job: if id is not None: job.set_id(id) - if origin is not None: + if origin: job.origin = origin # Set the core job tuple properties @@ -504,7 +504,7 @@ class Job: self._data = UNEVALUATED @property - def args(self): + def args(self) -> tuple: if self._args is UNEVALUATED: self._deserialize_data() return self._args @@ -608,7 +608,7 @@ class Job: self._failure_callback_name = None self._failure_callback = UNEVALUATED self.description: Optional[str] = None - self.origin: Optional[str] = None + self.origin: str = '' self.enqueued_at: Optional[datetime] = None self.started_at: Optional[datetime] = None self.ended_at: Optional[datetime] = None @@ -623,7 +623,7 @@ class Job: self.worker_name: Optional[str] = None self._status = None self._dependency_ids: List[str] = [] - self.meta: Optional[Dict] = {} + self.meta: Dict = {} self.serializer = resolve_serializer(serializer) self.retries_left: Optional[int] = None self.retry_intervals: Optional[List[int]] = None @@ -883,7 +883,7 @@ class Job: self.data = raw_data self.created_at = str_to_date(obj.get('created_at')) - self.origin = as_text(obj.get('origin')) if obj.get('origin') else None + self.origin = as_text(obj.get('origin')) if obj.get('origin') else '' self.worker_name = obj.get('worker_name').decode() if obj.get('worker_name') else None self.description = as_text(obj.get('description')) if obj.get('description') else None self.enqueued_at = str_to_date(obj.get('enqueued_at')) @@ -977,7 +977,7 @@ class Job: obj['retries_left'] = self.retries_left if self.retry_intervals is not None: obj['retry_intervals'] = json.dumps(self.retry_intervals) - if self.origin is not None: + if self.origin: obj['origin'] = self.origin if self.description is not None: obj['description'] = self.description diff --git a/rq/maintenance.py b/rq/maintenance.py new file mode 100644 index 0000000..b078a23 --- /dev/null +++ b/rq/maintenance.py @@ -0,0 +1,25 @@ +from typing import TYPE_CHECKING + +from .queue import Queue +from .utils import as_text + +if TYPE_CHECKING: + from .worker import BaseWorker + + +def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None: + """ + Check whether there are any jobs stuck in the intermediate queue. + + A job may be stuck in the intermediate queue if a worker has successfully dequeued a job + but was not able to push it to the StartedJobRegistry. This may happen in rare cases + of hardware or network failure. + + We consider a job to be stuck in the intermediate queue if it doesn't exist in the StartedJobRegistry. + """ + job_ids = [as_text(job_id) for job_id in queue.connection.lrange(queue.intermediate_queue_key, 0, -1)] + for job_id in job_ids: + if job_id not in queue.started_job_registry: + job = queue.fetch_job(job_id) + worker.handle_job_failure(job, queue, exc_string='Job was stuck in the intermediate queue.') + queue.connection.lrem(queue.intermediate_queue_key, 1, job_id) diff --git a/rq/queue.py b/rq/queue.py index 2d74ddf..9b1e1e5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -138,6 +138,18 @@ class Queue: death_penalty_class=death_penalty_class, ) + @classmethod + def get_intermediate_queue_key(cls, key: str) -> str: + """Returns the intermediate queue key for a given queue key. + + Args: + key (str): The queue key + + Returns: + str: The intermediate queue key + """ + return f'{key}:intermediate' + def __init__( self, name: str = 'default', @@ -209,6 +221,11 @@ class Queue: """Returns the Redis key for this Queue.""" return self._key + @property + def intermediate_queue_key(self): + """Returns the Redis key for intermediate queue.""" + return self.get_intermediate_queue_key(self._key) + @property def registry_cleaning_key(self): """Redis key used to indicate this queue has been cleaned.""" @@ -221,7 +238,7 @@ class Queue: pid = self.connection.get(RQScheduler.get_locking_key(self.name)) return int(pid.decode()) if pid is not None else None - def acquire_cleaning_lock(self) -> bool: + def acquire_maintenance_lock(self) -> bool: """Returns a boolean indicating whether a lock to clean this queue is acquired. A lock expires in 899 seconds (15 minutes - 1 second) @@ -290,7 +307,7 @@ class Queue: return self.count == 0 @property - def is_async(self): + def is_async(self) -> bool: """Returns whether the current queue is async.""" return bool(self._is_async) @@ -1175,8 +1192,8 @@ class Queue: @classmethod def lpop(cls, queue_keys: List[str], timeout: Optional[int], connection: Optional['Redis'] = None): - """Helper method. Intermediate method to abstract away from some - Redis API details, where LPOP accepts only a single key, whereas BLPOP + """Helper method to abstract away from some Redis API details + where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to iterate over all queues, do individual LPOPs, and return the result. @@ -1207,7 +1224,7 @@ class Queue: logger.debug(f"Starting BLPOP operation for queues {colored_queues} with timeout of {timeout}") result = connection.blpop(queue_keys, timeout) if result is None: - logger.debug(f"BLPOP Timeout, no jobs found on queues {colored_queues}") + logger.debug(f"BLPOP timeout, no jobs found on queues {colored_queues}") raise DequeueTimeout(timeout, queue_keys) queue_key, job_id = result return queue_key, job_id @@ -1218,6 +1235,27 @@ class Queue: return queue_key, blob return None + @classmethod + def lmove(cls, connection: 'Redis', queue_key: str, timeout: Optional[int]): + """Similar to lpop, but accepts only a single queue key and immediately pushes + the result to an intermediate queue. + """ + if timeout is not None: # blocking variant + if timeout == 0: + raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') + colored_queue = green(queue_key) + logger.debug(f"Starting BLMOVE operation for {colored_queue} with timeout of {timeout}") + result = connection.blmove(queue_key, cls.get_intermediate_queue_key(queue_key), timeout) + if result is None: + logger.debug(f"BLMOVE timeout, no jobs found on {colored_queue}") + raise DequeueTimeout(timeout, queue_key) + return queue_key, result + else: # non-blocking variant + result = connection.lmove(queue_key, cls.get_intermediate_queue_key(queue_key)) + if result is not None: + return queue_key, result + return None + @classmethod def dequeue_any( cls, @@ -1256,7 +1294,10 @@ class Queue: while True: queue_keys = [q.key for q in queues] - result = cls.lpop(queue_keys, timeout, connection=connection) + if len(queue_keys) == 1 and get_version(connection) >= (6, 2, 0): + result = cls.lmove(connection, queue_keys[0], timeout) + else: + result = cls.lpop(queue_keys, timeout, connection=connection) if result is None: return None queue_key, job_id = map(as_text, result) diff --git a/rq/registry.py b/rq/registry.py index b955d6b..31b02cb 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -66,7 +66,7 @@ class BaseRegistry: and self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs ) - def __contains__(self, item: Union[str, 'Job']): + def __contains__(self, item: Union[str, 'Job']) -> bool: """ Returns a boolean indicating registry contains the given job instance or job id. diff --git a/rq/worker.py b/rq/worker.py index 062b1b4..a557278 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -48,22 +48,14 @@ from .defaults import ( from .exceptions import DequeueTimeout, DeserializationError, ShutDownImminentException from .job import Job, JobStatus from .logutils import blue, green, setup_loghandlers, yellow +from .maintenance import clean_intermediate_queue from .queue import Queue from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .serializers import resolve_serializer from .suspension import is_suspended from .timeouts import HorseMonitorTimeoutException, JobTimeoutException, UnixSignalDeathPenalty -from .utils import ( - as_text, - backend_class, - compact, - ensure_list, - get_version, - utcformat, - utcnow, - utcparse, -) +from .utils import as_text, backend_class, compact, ensure_list, get_version, utcformat, utcnow, utcparse from .version import VERSION try: @@ -294,6 +286,38 @@ class BaseWorker: return True return False + def _set_connection(self, connection: Optional['Redis']) -> 'Redis': + """Configures the Redis connection to have a socket timeout. + This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP). + If the connection provided already has a `socket_timeout` defined, skips. + + Args: + connection (Optional[Redis]): The Redis Connection. + """ + if connection is None: + connection = get_current_connection() + current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout") + if current_socket_timeout is None: + timeout_config = {"socket_timeout": self.connection_timeout} + connection.connection_pool.connection_kwargs.update(timeout_config) + return connection + + @property + def dequeue_timeout(self) -> int: + return max(1, self.worker_ttl - 15) + + def clean_registries(self): + """Runs maintenance jobs on each Queue's registries.""" + for queue in self.queues: + # If there are multiple workers running, we only want 1 worker + # to run clean_registries(). + if queue.acquire_maintenance_lock(): + self.log.info('Cleaning registries for queue: %s', queue.name) + clean_registries(queue) + worker_registration.clean_worker_registry(queue) + clean_intermediate_queue(self, queue) + self.last_cleaned_at = utcnow() + def get_redis_server_version(self): """Return Redis server version of connection""" if not self.redis_server_version: @@ -683,22 +707,6 @@ class Worker(BaseWorker): worker.refresh() return worker - def _set_connection(self, connection: Optional['Redis']) -> 'Redis': - """Configures the Redis connection to have a socket timeout. - This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP). - If the connection provided already has a `socket_timeout` defined, skips. - - Args: - connection (Optional[Redis]): The Redis Connection. - """ - if connection is None: - connection = get_current_connection() - current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout") - if current_socket_timeout is None: - timeout_config = {"socket_timeout": self.connection_timeout} - connection.connection_pool.connection_kwargs.update(timeout_config) - return connection - @property def horse_pid(self): """The horse's process ID. Only available in the worker. Will return @@ -711,10 +719,6 @@ class Worker(BaseWorker): """Returns whether or not this is the worker or the work horse.""" return self._is_horse - @property - def dequeue_timeout(self) -> int: - return max(1, self.worker_ttl - 15) - @property def connection_timeout(self) -> int: return self.dequeue_timeout + 10 @@ -1263,7 +1267,7 @@ class Worker(BaseWorker): signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_DFL) - def prepare_job_execution(self, job: 'Job'): + def prepare_job_execution(self, job: 'Job', remove_from_intermediate_queue: bool = False): """Performs misc bookkeeping like updating states prior to job execution. """ @@ -1277,6 +1281,11 @@ class Worker(BaseWorker): job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline) job.prepare_for_execution(self.name, pipeline=pipeline) + if remove_from_intermediate_queue: + from .queue import Queue + + queue = Queue(job.origin, connection=self.connection) + pipeline.lrem(queue.intermediate_queue_key, 1, job.id) pipeline.execute() self.log.debug('Job preparation finished.') @@ -1407,7 +1416,8 @@ class Worker(BaseWorker): self.log.debug('Started Job Registry set.') try: - self.prepare_job_execution(job) + remove_from_intermediate_queue = len(self.queues) == 1 + self.prepare_job_execution(job, remove_from_intermediate_queue) job.started_at = utcnow() timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT @@ -1524,17 +1534,6 @@ class Worker(BaseWorker): """The hash does not take the database/connection into account""" return hash(self.name) - def clean_registries(self): - """Runs maintenance jobs on each Queue's registries.""" - for queue in self.queues: - # If there are multiple workers running, we only want 1 worker - # to run clean_registries(). - if queue.acquire_cleaning_lock(): - self.log.info('Cleaning registries for queue: %s', queue.name) - clean_registries(queue) - worker_registration.clean_worker_registry(queue) - self.last_cleaned_at = utcnow() - def handle_payload(self, message): """Handle external commands""" self.log.debug('Received message: %s', message) diff --git a/tests/test_job.py b/tests/test_job.py index 444080f..f855fa8 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -49,7 +49,7 @@ class TestJob(RQTestCase): self.assertEqual(str(job), "" % job.id) # ...and nothing else - self.assertIsNone(job.origin) + self.assertEqual(job.origin, '') self.assertIsNone(job.enqueued_at) self.assertIsNone(job.started_at) self.assertIsNone(job.ended_at) @@ -87,7 +87,7 @@ class TestJob(RQTestCase): self.assertEqual(job.kwargs, {'z': 2}) # ...but metadata is not - self.assertIsNone(job.origin) + self.assertEqual(job.origin, '') self.assertIsNone(job.enqueued_at) self.assertIsNone(job.result) diff --git a/tests/test_maintenance.py b/tests/test_maintenance.py new file mode 100644 index 0000000..8cef010 --- /dev/null +++ b/tests/test_maintenance.py @@ -0,0 +1,36 @@ +import unittest +from unittest.mock import patch + +from redis import Redis + +from rq.job import JobStatus +from rq.maintenance import clean_intermediate_queue +from rq.queue import Queue +from rq.utils import get_version +from rq.worker import Worker +from tests import RQTestCase +from tests.fixtures import say_hello + + +class MaintenanceTestCase(RQTestCase): + @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') + def test_cleanup_intermediate_queue(self): + """Ensure jobs stuck in the intermediate queue are cleaned up.""" + queue = Queue('foo', connection=self.testconn) + job = queue.enqueue(say_hello) + + # If job execution fails after it's dequeued, job should be in the intermediate queue + # # and it's status is still QUEUED + with patch.object(Worker, 'execute_job'): + # mocked.execute_job.side_effect = Exception() + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) + + self.assertEqual(job.get_status(), JobStatus.QUEUED) + self.assertFalse(job.id in queue.get_job_ids()) + self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id)) + # After cleaning up the intermediate queue, job status should be `FAILED` + # and job is also removed from the intermediate queue + clean_intermediate_queue(worker, queue) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key, job.id)) diff --git a/tests/test_queue.py b/tests/test_queue.py index e91ae54..4ae4e26 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,7 +1,10 @@ import json +import unittest from datetime import datetime, timedelta, timezone from unittest.mock import patch +from redis import Redis + from rq import Queue, Retry from rq.job import Job, JobStatus from rq.registry import ( @@ -13,6 +16,7 @@ from rq.registry import ( StartedJobRegistry, ) from rq.serializers import JSONSerializer +from rq.utils import get_version from rq.worker import Worker from tests import RQTestCase from tests.fixtures import echo, say_hello @@ -226,8 +230,10 @@ class TestQueue(RQTestCase): def test_dequeue_any(self): """Fetching work from any given queue.""" - fooq = Queue('foo') - barq = Queue('bar') + fooq = Queue('foo', connection=self.testconn) + barq = Queue('bar', connection=self.testconn) + + self.assertRaises(ValueError, Queue.dequeue_any, [fooq, barq], timeout=0, connection=self.testconn) self.assertEqual(Queue.dequeue_any([fooq, barq], None), None) @@ -253,6 +259,47 @@ class TestQueue(RQTestCase): self.assertEqual(job.origin, barq.name) self.assertEqual(job.args[0], 'for Bar', 'Bar should be dequeued second.') + @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') + def test_dequeue_any_reliable(self): + """Dequeueing job from a single queue moves job to intermediate queue.""" + foo_queue = Queue('foo', connection=self.testconn) + job_1 = foo_queue.enqueue(say_hello) + self.assertRaises(ValueError, Queue.dequeue_any, [foo_queue], timeout=0, connection=self.testconn) + + # Job ID is not in intermediate queue + self.assertIsNone(self.testconn.lpos(foo_queue.intermediate_queue_key, job_1.id)) + job, queue = Queue.dequeue_any([foo_queue], timeout=None, connection=self.testconn) + self.assertEqual(queue, foo_queue) + self.assertEqual(job.func, say_hello) + # After job is dequeued, the job ID is in the intermediate queue + self.assertEqual(self.testconn.lpos(foo_queue.intermediate_queue_key, job.id), 0) + + # Test the blocking version + foo_queue.enqueue(say_hello) + job, queue = Queue.dequeue_any([foo_queue], timeout=1, connection=self.testconn) + self.assertEqual(queue, foo_queue) + self.assertEqual(job.func, say_hello) + # After job is dequeued, the job ID is in the intermediate queue + self.assertEqual(self.testconn.lpos(foo_queue.intermediate_queue_key, job.id), 1) + + @unittest.skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') + def test_intermediate_queue(self): + """Job should be stuck in intermediate queue if execution fails after dequeued.""" + queue = Queue('foo', connection=self.testconn) + job = queue.enqueue(say_hello) + + # If job execution fails after it's dequeued, job should be in the intermediate queue + # # and it's status is still QUEUED + with patch.object(Worker, 'execute_job'): + # mocked.execute_job.side_effect = Exception() + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) + + # Job status is still QUEUED even though it's already dequeued + self.assertEqual(job.get_status(refresh=True), JobStatus.QUEUED) + self.assertFalse(job.id in queue.get_job_ids()) + self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id)) + def test_dequeue_any_ignores_nonexisting_jobs(self): """Dequeuing (from any queue) silently ignores non-existing jobs.""" @@ -324,6 +371,7 @@ class TestQueue(RQTestCase): def test_synchronous_timeout(self): queue = Queue(is_async=False) + self.assertFalse(queue.is_async) no_expire_job = queue.enqueue(echo, result_ttl=-1) self.assertEqual(queue.connection.ttl(no_expire_job.key), -1) @@ -683,7 +731,7 @@ class TestQueue(RQTestCase): """Fetch a job from a queue.""" q = Queue('example') job_orig = q.enqueue(say_hello) - job_fetch = q.fetch_job(job_orig.id) + job_fetch: Job = q.fetch_job(job_orig.id) # type: ignore self.assertIsNotNone(job_fetch) self.assertEqual(job_orig.id, job_fetch.id) self.assertEqual(job_orig.description, job_fetch.description) diff --git a/tests/test_worker.py b/tests/test_worker.py index 285ae42..6b6d3d5 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -15,6 +15,7 @@ from unittest.mock import Mock import psutil import pytest import redis.exceptions +from redis import Redis from rq import Queue, SimpleWorker, Worker, get_current_connection from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL @@ -23,7 +24,7 @@ from rq.registry import FailedJobRegistry, FinishedJobRegistry, StartedJobRegist from rq.results import Result from rq.serializers import JSONSerializer from rq.suspension import resume, suspend -from rq.utils import as_text, utcnow +from rq.utils import as_text, get_version, utcnow from rq.version import VERSION from rq.worker import HerokuWorker, RandomWorker, RoundRobinWorker, WorkerStatus from tests import RQTestCase, slow @@ -799,6 +800,28 @@ class TestWorker(RQTestCase): self.assertEqual(job._status, JobStatus.STARTED) self.assertEqual(job.worker_name, worker.name) + @skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') + def test_prepare_job_execution_removes_key_from_intermediate_queue(self): + """Prepare job execution removes job from intermediate queue.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + + Queue.dequeue_any([queue], timeout=None, connection=self.testconn) + self.assertIsNotNone(self.testconn.lpos(queue.intermediate_queue_key, job.id)) + worker = Worker([queue]) + worker.prepare_job_execution(job, remove_from_intermediate_queue=True) + self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key, job.id)) + self.assertEqual(queue.count, 0) + + @skipIf(get_version(Redis()) < (6, 2, 0), 'Skip if Redis server < 6.2.0') + def test_work_removes_key_from_intermediate_queue(self): + """Worker removes job from intermediate queue.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + worker = Worker([queue]) + worker.work(burst=True) + self.assertIsNone(self.testconn.lpos(queue.intermediate_queue_key, job.id)) + def test_work_unicode_friendly(self): """Worker processes work with unicode description, then quits.""" q = Queue('foo') diff --git a/tox.ini b/tox.ini index 5adb901..904fe45 100644 --- a/tox.ini +++ b/tox.ini @@ -11,7 +11,6 @@ deps= psutil passenv= RUN_SSL_TESTS - RUN_SLOW_TESTS_TOO [testenv:lint] basepython = python3.10