Add exception to catch redis connection failure to retry after wait time (#1387)

* add exception catch for redis connection failure

* Add test for connection recovery

* add exponential backoff

* limit worker max connection wait time to 60 seconds

* fix undefined class variable

* fix string formatting issue while printing error log

* cap max connection wait time:better code style

Co-authored-by: corynezin <cory.nezin@gmail.com>
main
Adda Satya Ram 4 years ago committed by GitHub
parent 709043989a
commit 11c8631921
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -22,7 +22,7 @@ try:
except ImportError: except ImportError:
from signal import SIGTERM as SIGKILL from signal import SIGTERM as SIGKILL
from redis import WatchError import redis.exceptions
from . import worker_registration from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
@ -106,6 +106,10 @@ class Worker(object):
log_result_lifespan = True log_result_lifespan = True
# `log_job_description` is used to toggle logging an entire jobs description. # `log_job_description` is used to toggle logging an entire jobs description.
log_job_description = True log_job_description = True
# factor to increase connection_wait_time incase of continous connection failures.
exponential_backoff_factor = 2.0
# Max Wait time (in seconds) after which exponential_backoff_factor wont be applicable.
max_connection_wait_time = 60.0
@classmethod @classmethod
def all(cls, connection=None, job_class=None, queue_class=None, queue=None, serializer=None): def all(cls, connection=None, job_class=None, queue_class=None, queue=None, serializer=None):
@ -469,7 +473,6 @@ class Worker(object):
def check_for_suspension(self, burst): def check_for_suspension(self, burst):
"""Check to see if workers have been suspended by `rq suspend`""" """Check to see if workers have been suspended by `rq suspend`"""
before_state = None before_state = None
notified = False notified = False
@ -628,14 +631,15 @@ class Worker(object):
self.set_state(WorkerStatus.IDLE) self.set_state(WorkerStatus.IDLE)
self.procline('Listening on ' + qnames) self.procline('Listening on ' + qnames)
self.log.debug('*** Listening on %s...', green(qnames)) self.log.debug('*** Listening on %s...', green(qnames))
connection_wait_time = 1.0
while True: while True:
try:
self.heartbeat() self.heartbeat()
if self.should_run_maintenance_tasks: if self.should_run_maintenance_tasks:
self.run_maintenance_tasks() self.run_maintenance_tasks()
try:
result = self.queue_class.dequeue_any(self.queues, timeout, result = self.queue_class.dequeue_any(self.queues, timeout,
connection=self.connection, connection=self.connection,
job_class=self.job_class, job_class=self.job_class,
@ -654,6 +658,14 @@ class Worker(object):
break break
except DequeueTimeout: except DequeueTimeout:
pass pass
except redis.exceptions.ConnectionError as conn_err:
self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...',
conn_err, connection_wait_time)
time.sleep(connection_wait_time)
connection_wait_time *= self.exponential_backoff_factor
connection_wait_time = min(connection_wait_time, self.max_connection_wait_time)
else:
connection_wait_time = 1.0
self.heartbeat() self.heartbeat()
return result return result
@ -955,7 +967,7 @@ class Worker(object):
pipeline.execute() pipeline.execute()
break break
except WatchError: except redis.exceptions.WatchError:
continue continue
def perform_job(self, job, queue, heartbeat_ttl=None): def perform_job(self, job, queue, heartbeat_ttl=None):

@ -18,6 +18,7 @@ from time import sleep
from unittest import skipIf from unittest import skipIf
import redis.exceptions
import pytest import pytest
import mock import mock
from mock import Mock from mock import Mock
@ -283,6 +284,19 @@ class TestWorker(RQTestCase):
self.testconn.hdel(w.key, 'birth') self.testconn.hdel(w.key, 'birth')
w.refresh() w.refresh()
@slow
def test_heartbeat_survives_lost_connection(self):
with mock.patch.object(Worker, 'heartbeat') as mocked:
# None -> Heartbeat is first called before the job loop
mocked.side_effect = [None, redis.exceptions.ConnectionError()]
q = Queue()
w = Worker([q])
w.work(burst=True)
# First call is prior to job loop, second raises the error,
# third is successful, after "recovery"
assert mocked.call_count == 3
@slow @slow
def test_heartbeat_busy(self): def test_heartbeat_busy(self):
"""Periodic heartbeats while horse is busy with long jobs""" """Periodic heartbeats while horse is busy with long jobs"""

Loading…
Cancel
Save