add periodic worker heartbeats (#945)

* add periodic worker heartbeats

fixes #944

* improve worker default option handling
main
Thomas Kriechbaumer 7 years ago committed by Selwin Ong
parent 63a04d275e
commit 3133d94b58

@ -18,7 +18,9 @@ from rq.cli.helpers import (read_config_file, refresh,
show_both, show_queues, show_workers, CliConfig)
from rq.contrib.legacy import cleanup_ghosts
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS,
DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL,
DEFAULT_JOB_MONITORING_INTERVAL)
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from rq.suspension import (suspend as connection_suspend,
@ -172,8 +174,9 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--logging_level', type=str, default="INFO", help='Set logging level')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--results-ttl', type=int, help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, help='Default worker timeout to be used')
@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL , help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL , help='Default worker timeout to be used')
@click.option('--job-monitoring-interval', type=int, default=DEFAULT_JOB_MONITORING_INTERVAL , help='Default job monitoring interval to be used')
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN')
@ -182,8 +185,8 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(cli_config, burst, logging_level, name, results_ttl,
worker_ttl, verbose, quiet, sentry_dsn, exception_handler,
pid, queues, **options):
worker_ttl, job_monitoring_interval, verbose, quiet, sentry_dsn,
exception_handler, pid, queues, **options):
"""Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {}
@ -217,6 +220,7 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
connection=cli_config.connection,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
job_monitoring_interval=job_monitoring_interval,
job_class=cli_config.job_class,
queue_class=cli_config.queue_class,
exception_handlers=exception_handlers or None)

@ -3,4 +3,5 @@ DEFAULT_QUEUE_CLASS = 'rq.Queue'
DEFAULT_WORKER_CLASS = 'rq.Worker'
DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis'
DEFAULT_WORKER_TTL = 420
DEFAULT_JOB_MONITORING_INTERVAL = 30
DEFAULT_RESULT_TTL = 500

@ -5,18 +5,31 @@ from __future__ import (absolute_import, division, print_function,
import signal
class JobTimeoutException(Exception):
class BaseTimeoutException(Exception):
"""Base exception for timeouts."""
pass
class JobTimeoutException(BaseTimeoutException):
"""Raised when a job takes longer to complete than the allowed maximum
timeout value.
"""
pass
class HorseMonitorTimeoutException(BaseTimeoutException):
"""Raised when waiting for a horse exiting takes longer than the maximum
timeout value.
"""
pass
class BaseDeathPenalty(object):
"""Base class to setup job timeouts."""
def __init__(self, timeout):
def __init__(self, timeout, exception=JobTimeoutException):
self._timeout = timeout
self._exception = exception
def __enter__(self):
self.setup_death_penalty()
@ -25,7 +38,7 @@ class BaseDeathPenalty(object):
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
except JobTimeoutException:
except BaseTimeoutException:
# Weird case: we're done with the with body, but now the alarm is
# fired. We may safely ignore this situation and consider the
# body done.
@ -33,7 +46,7 @@ class BaseDeathPenalty(object):
# __exit__ may return True to supress further exception handling. We
# don't want to suppress any exceptions here, since all errors should
# just pass through, JobTimeoutException being handled normally to the
# just pass through, BaseTimeoutException being handled normally to the
# invoking context.
return False
@ -47,13 +60,12 @@ class BaseDeathPenalty(object):
class UnixSignalDeathPenalty(BaseDeathPenalty):
def handle_death_penalty(self, signum, frame):
raise JobTimeoutException('Job exceeded maximum timeout '
'value ({0} seconds)'.format(self._timeout))
raise self._exception('Task exceeded maximum timeout value '
'({0} seconds)'.format(self._timeout))
def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises
a JobTimeoutException after the timeout amount (expressed in
seconds).
an exception after the timeout amount (expressed in seconds).
"""
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)

@ -24,14 +24,14 @@ from redis import WatchError
from . import worker_registration
from .compat import PY2, as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection
from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL
from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL
from .exceptions import DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import Queue, get_failed_queue
from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries
from .suspension import is_suspended
from .timeouts import UnixSignalDeathPenalty
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import (backend_class, ensure_list, enum,
make_colorizer, utcformat, utcnow, utcparse)
from .version import VERSION
@ -155,9 +155,11 @@ class Worker(object):
return worker
def __init__(self, queues, name=None, default_result_ttl=None, connection=None,
exc_handler=None, exception_handlers=None, default_worker_ttl=None,
job_class=None, queue_class=None): # noqa
def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL,
connection=None, exc_handler=None, exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL, job_class=None,
queue_class=None,
job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL): # noqa
if connection is None:
connection = get_current_connection()
self.connection = connection
@ -175,13 +177,9 @@ class Worker(object):
self.validate_queues()
self._exc_handlers = []
if default_result_ttl is None:
default_result_ttl = DEFAULT_RESULT_TTL
self.default_result_ttl = default_result_ttl
if default_worker_ttl is None:
default_worker_ttl = DEFAULT_WORKER_TTL
self.default_worker_ttl = default_worker_ttl
self.job_monitoring_interval = job_monitoring_interval
self._state = 'starting'
self._is_horse = False
@ -483,7 +481,7 @@ class Worker(object):
self.log.info('Stopping on request')
break
timeout = None if burst else max(1, self.default_worker_ttl - 60)
timeout = None if burst else max(1, self.default_worker_ttl - 15)
result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
@ -531,7 +529,7 @@ class Worker(object):
self.heartbeat()
return result
def heartbeat(self, timeout=0, pipeline=None):
def heartbeat(self, timeout=None, pipeline=None):
"""Specifies a new worker timeout, typically by extending the
expiration time of the worker, effectively making this a "heartbeat"
to not expire the worker until the timeout passes.
@ -539,10 +537,10 @@ class Worker(object):
The next heartbeat should come before this time, or the worker will
die (at least from the monitoring dashboards).
The effective timeout can never be shorter than default_worker_ttl,
only larger.
If no timeout is given, the default_worker_ttl will be used to update
the expiration time of the worker.
"""
timeout = max(timeout, self.default_worker_ttl)
timeout = timeout or self.default_worker_ttl
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
@ -610,8 +608,13 @@ class Worker(object):
"""
while True:
try:
self._monitor_work_horse_tick(job)
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val = os.waitpid(self._horse_pid, 0)
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5)
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
@ -621,9 +624,9 @@ class Worker(object):
# which we don't want to catch, so we re-raise those ones.
if e.errno != errno.EINTR:
raise
# Send a heartbeat to keep the worker alive.
self.heartbeat()
def _monitor_work_horse_tick(self, job):
_, ret_val = os.waitpid(self._horse_pid, 0)
if ret_val == os.EX_OK: # The process exited normally.
return
job_status = job.get_status()
@ -696,7 +699,7 @@ class Worker(object):
with self.connection._pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(timeout, pipeline=pipeline)
self.heartbeat(self.job_monitoring_interval + 5, pipeline=pipeline)
registry = StartedJobRegistry(job.origin,
self.connection,
job_class=self.job_class)
@ -785,7 +788,8 @@ class Worker(object):
try:
job.started_at = utcnow()
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException):
rv = job.perform()
job.ended_at = utcnow()

@ -24,7 +24,8 @@ from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, do_nothing, say_hello, say_pid,
run_dummy_heroku_worker, access_self,
modify_self, modify_self_and_error)
modify_self, modify_self_and_error,
long_running_job)
from rq import (get_failed_queue, Queue, SimpleWorker, Worker,
get_current_connection)
@ -247,6 +248,23 @@ class TestWorker(RQTestCase):
self.testconn.hdel(w.key, 'birth')
w.refresh()
@slow
def test_heartbeat_busy(self):
"""Periodic heartbeats while horse is busy with long jobs"""
q = Queue()
w = Worker([q], job_monitoring_interval=5)
for timeout, expected_heartbeats in [(2, 0), (7, 1), (12, 2)]:
job = q.enqueue(long_running_job,
args=(timeout,),
timeout=30,
result_ttl=-1)
with mock.patch.object(w, 'heartbeat', wraps=w.heartbeat) as mocked:
w.execute_job(job, q)
self.assertEqual(mocked.call_count, expected_heartbeats)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_work_fails(self):
"""Failing jobs are put on the failed queue."""
q = Queue()

Loading…
Cancel
Save