Add failure callback call to started job registry cleanup (#1824)

* Add started job registry cleanup job failure callback call

* WIP - need to fix test

* fix test

* rename, tests and docs

* better log message

* use class name

* Update registry.py
main
Rony Lutsky 2 years ago committed by GitHub
parent 95558fcc1d
commit 0ba3971d55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -144,3 +144,11 @@ def my_work_horse_killed_handler(job: Job, retpid: int, ret_val: int, rusage: st
# do your thing here, for example set job.retries_left to 0 # do your thing here, for example set job.retries_left to 0
``` ```
## Built-in Exceptions
RQ Exceptions you can get in your job failure callbacks
# AbandonedJobError
This error means an unfinished job was collected by another worker's maintenance task.
This usually happens when a worker is busy with a job and is terminated before it finished that job.
Another worker collects this job and moves it to the FailedJobRegistry.

@ -30,3 +30,7 @@ class ShutDownImminentException(Exception):
class TimeoutFormatError(Exception): class TimeoutFormatError(Exception):
pass pass
class AbandonedJobError(Exception):
pass

@ -1,22 +1,30 @@
import calendar import calendar
import logging
import traceback
from rq.serializers import resolve_serializer from rq.serializers import resolve_serializer
import time import time
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, List, Optional, Type, Union from typing import TYPE_CHECKING, Any, List, Optional, Type, Union
from .timeouts import JobTimeoutException, UnixSignalDeathPenalty
if TYPE_CHECKING: if TYPE_CHECKING:
from redis import Redis from redis import Redis
from redis.client import Pipeline from redis.client import Pipeline
from .utils import as_text from .utils import as_text
from .connections import resolve_connection from .connections import resolve_connection
from .defaults import DEFAULT_FAILURE_TTL from .defaults import DEFAULT_FAILURE_TTL, CALLBACK_TIMEOUT
from .exceptions import InvalidJobOperation, NoSuchJobError from .exceptions import InvalidJobOperation, NoSuchJobError, AbandonedJobError
from .job import Job, JobStatus from .job import Job, JobStatus
from .queue import Queue from .queue import Queue
from .utils import backend_class, current_timestamp from .utils import backend_class, current_timestamp
logger = logging.getLogger("rq.registry")
class BaseRegistry: class BaseRegistry:
""" """
Base implementation of a job registry, implemented in Redis sorted set. Base implementation of a job registry, implemented in Redis sorted set.
@ -202,9 +210,10 @@ class StartedJobRegistry(BaseRegistry):
""" """
key_template = 'rq:wip:{0}' key_template = 'rq:wip:{0}'
death_penalty_class = UnixSignalDeathPenalty
def cleanup(self, timestamp: Optional[float] = None): def cleanup(self, timestamp: Optional[float] = None):
"""Remove expired jobs from registry and add them to FailedJobRegistry. """Remove abandoned jobs from registry and add them to FailedJobRegistry.
Removes jobs with an expiry time earlier than timestamp, specified as Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if seconds since the Unix epoch. timestamp defaults to call time if
@ -226,6 +235,14 @@ class StartedJobRegistry(BaseRegistry):
except NoSuchJobError: except NoSuchJobError:
continue continue
if job.failure_callback:
try:
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.failure_callback(job, self.connection,
AbandonedJobError, AbandonedJobError(), traceback.extract_stack())
except: # noqa
logger.exception('Registry %s: error while executing failure callback', self.key)
retry = job.retries_left and job.retries_left > 0 retry = job.retries_left and job.retries_left > 0
if retry: if retry:
@ -233,8 +250,11 @@ class StartedJobRegistry(BaseRegistry):
job.retry(queue, pipeline) job.retry(queue, pipeline)
else: else:
exc_string = f"due to {AbandonedJobError.__name__}"
logger.warning(f'{self.__class__.__name__} cleanup: Moving job to {FailedJobRegistry.__name__} '
f'({exc_string})')
job.set_status(JobStatus.FAILED) job.set_status(JobStatus.FAILED)
job._exc_info = "Moved to FailedJobRegistry at %s" % datetime.now() job._exc_info = f"Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}"
job.save(pipeline=pipeline, include_meta=False) job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) job.cleanup(ttl=-1, pipeline=pipeline)
failed_job_registry.add(job, job.failure_ttl) failed_job_registry.add(job, job.failure_ttl)

@ -1346,6 +1346,9 @@ class Worker:
Args: Args:
job (Job): The Job job (Job): The Job
""" """
if not job.failure_callback:
return
self.log.debug(f"Running failure callbacks for {job.id}") self.log.debug(f"Running failure callbacks for {job.id}")
job.heartbeat(utcnow(), CALLBACK_TIMEOUT) job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
@ -1392,13 +1395,12 @@ class Worker:
exc_info = sys.exc_info() exc_info = sys.exc_info()
exc_string = ''.join(traceback.format_exception(*exc_info)) exc_string = ''.join(traceback.format_exception(*exc_info))
if job.failure_callback: try:
try: self.execute_failure_callback(job, *exc_info)
self.execute_failure_callback(job, *exc_info) except: # noqa
except: # noqa exc_info = sys.exc_info()
exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info))
exc_string = ''.join(traceback.format_exception(*exc_info)) self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=exc_info)
self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=exc_info)
self.handle_job_failure( self.handle_job_failure(
job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry

@ -1,9 +1,12 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from unittest import mock
from unittest.mock import PropertyMock, ANY
from rq.serializers import JSONSerializer from rq.serializers import JSONSerializer
from rq.utils import as_text from rq.utils import as_text
from rq.defaults import DEFAULT_FAILURE_TTL from rq.defaults import DEFAULT_FAILURE_TTL
from rq.exceptions import InvalidJobOperation from rq.exceptions import InvalidJobOperation, AbandonedJobError
from rq.job import Job, JobStatus, requeue_job from rq.job import Job, JobStatus, requeue_job
from rq.queue import Queue from rq.queue import Queue
from rq.utils import current_timestamp from rq.utils import current_timestamp
@ -161,7 +164,9 @@ class TestRegistry(RQTestCase):
self.assertNotIn(job, failed_job_registry) self.assertNotIn(job, failed_job_registry)
self.assertIn(job, self.registry) self.assertIn(job, self.registry)
self.registry.cleanup() with mock.patch.object(Job, 'failure_callback', PropertyMock()) as mocked:
self.registry.cleanup()
mocked.return_value.assert_any_call(job, self.testconn, AbandonedJobError, ANY, ANY)
self.assertIn(job.id, failed_job_registry) self.assertIn(job.id, failed_job_registry)
self.assertNotIn(job, self.registry) self.assertNotIn(job, self.registry)
job.refresh() job.refresh()

Loading…
Cancel
Save