diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 7e67ac5..8c71b60 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -18,7 +18,9 @@ from rq.cli.helpers import (read_config_file, refresh, show_both, show_queues, show_workers, CliConfig) from rq.contrib.legacy import cleanup_ghosts from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, - DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS) + DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, + DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, + DEFAULT_JOB_MONITORING_INTERVAL) from rq.exceptions import InvalidJobOperationError from rq.utils import import_attribute from rq.suspension import (suspend as connection_suspend, @@ -172,8 +174,9 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') @click.option('--logging_level', type=str, default="INFO", help='Set logging level') @click.option('--name', '-n', help='Specify a different name') -@click.option('--results-ttl', type=int, help='Default results timeout to be used') -@click.option('--worker-ttl', type=int, help='Default worker timeout to be used') +@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL , help='Default results timeout to be used') +@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL , help='Default worker timeout to be used') +@click.option('--job-monitoring-interval', type=int, default=DEFAULT_JOB_MONITORING_INTERVAL , help='Default job monitoring interval to be used') @click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN') @@ -182,8 +185,8 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.argument('queues', nargs=-1) @pass_cli_config def worker(cli_config, burst, logging_level, name, results_ttl, - worker_ttl, verbose, quiet, sentry_dsn, exception_handler, - pid, queues, **options): + worker_ttl, job_monitoring_interval, verbose, quiet, sentry_dsn, + exception_handler, pid, queues, **options): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} @@ -217,6 +220,7 @@ def worker(cli_config, burst, logging_level, name, results_ttl, connection=cli_config.connection, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, + job_monitoring_interval=job_monitoring_interval, job_class=cli_config.job_class, queue_class=cli_config.queue_class, exception_handlers=exception_handlers or None) diff --git a/rq/defaults.py b/rq/defaults.py index 002c44b..57423c6 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -3,4 +3,5 @@ DEFAULT_QUEUE_CLASS = 'rq.Queue' DEFAULT_WORKER_CLASS = 'rq.Worker' DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis' DEFAULT_WORKER_TTL = 420 +DEFAULT_JOB_MONITORING_INTERVAL = 30 DEFAULT_RESULT_TTL = 500 diff --git a/rq/timeouts.py b/rq/timeouts.py index a6afdf2..b211097 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -5,18 +5,31 @@ from __future__ import (absolute_import, division, print_function, import signal -class JobTimeoutException(Exception): +class BaseTimeoutException(Exception): + """Base exception for timeouts.""" + pass + + +class JobTimeoutException(BaseTimeoutException): """Raised when a job takes longer to complete than the allowed maximum timeout value. """ pass +class HorseMonitorTimeoutException(BaseTimeoutException): + """Raised when waiting for a horse exiting takes longer than the maximum + timeout value. + """ + pass + + class BaseDeathPenalty(object): """Base class to setup job timeouts.""" - def __init__(self, timeout): + def __init__(self, timeout, exception=JobTimeoutException): self._timeout = timeout + self._exception = exception def __enter__(self): self.setup_death_penalty() @@ -25,7 +38,7 @@ class BaseDeathPenalty(object): # Always cancel immediately, since we're done try: self.cancel_death_penalty() - except JobTimeoutException: + except BaseTimeoutException: # Weird case: we're done with the with body, but now the alarm is # fired. We may safely ignore this situation and consider the # body done. @@ -33,7 +46,7 @@ class BaseDeathPenalty(object): # __exit__ may return True to supress further exception handling. We # don't want to suppress any exceptions here, since all errors should - # just pass through, JobTimeoutException being handled normally to the + # just pass through, BaseTimeoutException being handled normally to the # invoking context. return False @@ -47,13 +60,12 @@ class BaseDeathPenalty(object): class UnixSignalDeathPenalty(BaseDeathPenalty): def handle_death_penalty(self, signum, frame): - raise JobTimeoutException('Job exceeded maximum timeout ' - 'value ({0} seconds)'.format(self._timeout)) + raise self._exception('Task exceeded maximum timeout value ' + '({0} seconds)'.format(self._timeout)) def setup_death_penalty(self): """Sets up an alarm signal and a signal handler that raises - a JobTimeoutException after the timeout amount (expressed in - seconds). + an exception after the timeout amount (expressed in seconds). """ signal.signal(signal.SIGALRM, self.handle_death_penalty) signal.alarm(self._timeout) diff --git a/rq/worker.py b/rq/worker.py index 286403e..aeed67d 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -24,14 +24,14 @@ from redis import WatchError from . import worker_registration from .compat import PY2, as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection -from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL +from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue, get_failed_queue from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries from .suspension import is_suspended -from .timeouts import UnixSignalDeathPenalty +from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .utils import (backend_class, ensure_list, enum, make_colorizer, utcformat, utcnow, utcparse) from .version import VERSION @@ -155,9 +155,11 @@ class Worker(object): return worker - def __init__(self, queues, name=None, default_result_ttl=None, connection=None, - exc_handler=None, exception_handlers=None, default_worker_ttl=None, - job_class=None, queue_class=None): # noqa + def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL, + connection=None, exc_handler=None, exception_handlers=None, + default_worker_ttl=DEFAULT_WORKER_TTL, job_class=None, + queue_class=None, + job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -175,13 +177,9 @@ class Worker(object): self.validate_queues() self._exc_handlers = [] - if default_result_ttl is None: - default_result_ttl = DEFAULT_RESULT_TTL self.default_result_ttl = default_result_ttl - - if default_worker_ttl is None: - default_worker_ttl = DEFAULT_WORKER_TTL self.default_worker_ttl = default_worker_ttl + self.job_monitoring_interval = job_monitoring_interval self._state = 'starting' self._is_horse = False @@ -483,7 +481,7 @@ class Worker(object): self.log.info('Stopping on request') break - timeout = None if burst else max(1, self.default_worker_ttl - 60) + timeout = None if burst else max(1, self.default_worker_ttl - 15) result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: @@ -531,7 +529,7 @@ class Worker(object): self.heartbeat() return result - def heartbeat(self, timeout=0, pipeline=None): + def heartbeat(self, timeout=None, pipeline=None): """Specifies a new worker timeout, typically by extending the expiration time of the worker, effectively making this a "heartbeat" to not expire the worker until the timeout passes. @@ -539,10 +537,10 @@ class Worker(object): The next heartbeat should come before this time, or the worker will die (at least from the monitoring dashboards). - The effective timeout can never be shorter than default_worker_ttl, - only larger. + If no timeout is given, the default_worker_ttl will be used to update + the expiration time of the worker. """ - timeout = max(timeout, self.default_worker_ttl) + timeout = timeout or self.default_worker_ttl connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, timeout) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) @@ -610,8 +608,13 @@ class Worker(object): """ while True: try: - self._monitor_work_horse_tick(job) + with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): + retpid, ret_val = os.waitpid(self._horse_pid, 0) break + except HorseMonitorTimeoutException: + # Horse has not exited yet and is still running. + # Send a heartbeat to keep the worker alive. + self.heartbeat(self.job_monitoring_interval + 5) except OSError as e: # In case we encountered an OSError due to EINTR (which is # caused by a SIGINT or SIGTERM signal during @@ -621,9 +624,9 @@ class Worker(object): # which we don't want to catch, so we re-raise those ones. if e.errno != errno.EINTR: raise + # Send a heartbeat to keep the worker alive. + self.heartbeat() - def _monitor_work_horse_tick(self, job): - _, ret_val = os.waitpid(self._horse_pid, 0) if ret_val == os.EX_OK: # The process exited normally. return job_status = job.get_status() @@ -696,7 +699,7 @@ class Worker(object): with self.connection._pipeline() as pipeline: self.set_state(WorkerStatus.BUSY, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) - self.heartbeat(timeout, pipeline=pipeline) + self.heartbeat(self.job_monitoring_interval + 5, pipeline=pipeline) registry = StartedJobRegistry(job.origin, self.connection, job_class=self.job_class) @@ -785,7 +788,8 @@ class Worker(object): try: job.started_at = utcnow() - with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): + timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT + with self.death_penalty_class(timeout, JobTimeoutException): rv = job.perform() job.ended_at = utcnow() diff --git a/tests/test_worker.py b/tests/test_worker.py index 5367023..c11c06c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -24,7 +24,8 @@ from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, run_dummy_heroku_worker, access_self, - modify_self, modify_self_and_error) + modify_self, modify_self_and_error, + long_running_job) from rq import (get_failed_queue, Queue, SimpleWorker, Worker, get_current_connection) @@ -247,6 +248,23 @@ class TestWorker(RQTestCase): self.testconn.hdel(w.key, 'birth') w.refresh() + @slow + def test_heartbeat_busy(self): + """Periodic heartbeats while horse is busy with long jobs""" + q = Queue() + w = Worker([q], job_monitoring_interval=5) + + for timeout, expected_heartbeats in [(2, 0), (7, 1), (12, 2)]: + job = q.enqueue(long_running_job, + args=(timeout,), + timeout=30, + result_ttl=-1) + with mock.patch.object(w, 'heartbeat', wraps=w.heartbeat) as mocked: + w.execute_job(job, q) + self.assertEqual(mocked.call_count, expected_heartbeats) + job = Job.fetch(job.id) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + def test_work_fails(self): """Failing jobs are put on the failed queue.""" q = Queue()