From 0ba3971d55824e79af66d9e033811cee49de4ef5 Mon Sep 17 00:00:00 2001 From: Rony Lutsky <3050627+ronlut@users.noreply.github.com> Date: Sat, 4 Mar 2023 03:02:13 +0200 Subject: [PATCH] Add failure callback call to started job registry cleanup (#1824) * Add started job registry cleanup job failure callback call * WIP - need to fix test * fix test * rename, tests and docs * better log message * use class name * Update registry.py --- docs/docs/exceptions.md | 8 ++++++++ rq/exceptions.py | 4 ++++ rq/registry.py | 28 ++++++++++++++++++++++++---- rq/worker.py | 16 +++++++++------- tests/test_registry.py | 9 +++++++-- 5 files changed, 52 insertions(+), 13 deletions(-) diff --git a/docs/docs/exceptions.md b/docs/docs/exceptions.md index c036354..2ab8473 100644 --- a/docs/docs/exceptions.md +++ b/docs/docs/exceptions.md @@ -144,3 +144,11 @@ def my_work_horse_killed_handler(job: Job, retpid: int, ret_val: int, rusage: st # do your thing here, for example set job.retries_left to 0 ``` + +## Built-in Exceptions +RQ Exceptions you can get in your job failure callbacks + +# AbandonedJobError +This error means an unfinished job was collected by another worker's maintenance task. +This usually happens when a worker is busy with a job and is terminated before it finished that job. +Another worker collects this job and moves it to the FailedJobRegistry. \ No newline at end of file diff --git a/rq/exceptions.py b/rq/exceptions.py index ee51753..b84ade1 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -30,3 +30,7 @@ class ShutDownImminentException(Exception): class TimeoutFormatError(Exception): pass + + +class AbandonedJobError(Exception): + pass diff --git a/rq/registry.py b/rq/registry.py index 509bd87..2b51fff 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,22 +1,30 @@ import calendar +import logging +import traceback + from rq.serializers import resolve_serializer import time from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any, List, Optional, Type, Union +from .timeouts import JobTimeoutException, UnixSignalDeathPenalty + if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline from .utils import as_text from .connections import resolve_connection -from .defaults import DEFAULT_FAILURE_TTL -from .exceptions import InvalidJobOperation, NoSuchJobError +from .defaults import DEFAULT_FAILURE_TTL, CALLBACK_TIMEOUT +from .exceptions import InvalidJobOperation, NoSuchJobError, AbandonedJobError from .job import Job, JobStatus from .queue import Queue from .utils import backend_class, current_timestamp +logger = logging.getLogger("rq.registry") + + class BaseRegistry: """ Base implementation of a job registry, implemented in Redis sorted set. @@ -202,9 +210,10 @@ class StartedJobRegistry(BaseRegistry): """ key_template = 'rq:wip:{0}' + death_penalty_class = UnixSignalDeathPenalty def cleanup(self, timestamp: Optional[float] = None): - """Remove expired jobs from registry and add them to FailedJobRegistry. + """Remove abandoned jobs from registry and add them to FailedJobRegistry. Removes jobs with an expiry time earlier than timestamp, specified as seconds since the Unix epoch. timestamp defaults to call time if @@ -226,6 +235,14 @@ class StartedJobRegistry(BaseRegistry): except NoSuchJobError: continue + if job.failure_callback: + try: + with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): + job.failure_callback(job, self.connection, + AbandonedJobError, AbandonedJobError(), traceback.extract_stack()) + except: # noqa + logger.exception('Registry %s: error while executing failure callback', self.key) + retry = job.retries_left and job.retries_left > 0 if retry: @@ -233,8 +250,11 @@ class StartedJobRegistry(BaseRegistry): job.retry(queue, pipeline) else: + exc_string = f"due to {AbandonedJobError.__name__}" + logger.warning(f'{self.__class__.__name__} cleanup: Moving job to {FailedJobRegistry.__name__} ' + f'({exc_string})') job.set_status(JobStatus.FAILED) - job._exc_info = "Moved to FailedJobRegistry at %s" % datetime.now() + job._exc_info = f"Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}" job.save(pipeline=pipeline, include_meta=False) job.cleanup(ttl=-1, pipeline=pipeline) failed_job_registry.add(job, job.failure_ttl) diff --git a/rq/worker.py b/rq/worker.py index 2cf4d18..cb63c65 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1346,6 +1346,9 @@ class Worker: Args: job (Job): The Job """ + if not job.failure_callback: + return + self.log.debug(f"Running failure callbacks for {job.id}") job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): @@ -1392,13 +1395,12 @@ class Worker: exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) - if job.failure_callback: - try: - self.execute_failure_callback(job, *exc_info) - except: # noqa - exc_info = sys.exc_info() - exc_string = ''.join(traceback.format_exception(*exc_info)) - self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=exc_info) + try: + self.execute_failure_callback(job, *exc_info) + except: # noqa + exc_info = sys.exc_info() + exc_string = ''.join(traceback.format_exception(*exc_info)) + self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=exc_info) self.handle_job_failure( job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry diff --git a/tests/test_registry.py b/tests/test_registry.py index 28a29ca..152e3fd 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,9 +1,12 @@ from datetime import datetime, timedelta +from unittest import mock +from unittest.mock import PropertyMock, ANY + from rq.serializers import JSONSerializer from rq.utils import as_text from rq.defaults import DEFAULT_FAILURE_TTL -from rq.exceptions import InvalidJobOperation +from rq.exceptions import InvalidJobOperation, AbandonedJobError from rq.job import Job, JobStatus, requeue_job from rq.queue import Queue from rq.utils import current_timestamp @@ -161,7 +164,9 @@ class TestRegistry(RQTestCase): self.assertNotIn(job, failed_job_registry) self.assertIn(job, self.registry) - self.registry.cleanup() + with mock.patch.object(Job, 'failure_callback', PropertyMock()) as mocked: + self.registry.cleanup() + mocked.return_value.assert_any_call(job, self.testconn, AbandonedJobError, ANY, ANY) self.assertIn(job.id, failed_job_registry) self.assertNotIn(job, self.registry) job.refresh()