Jobs that get cleaned up should also be retried (#1467)

main
Selwin Ong 4 years ago committed by GitHub
parent d333d20914
commit 5b5cfdf9ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,6 +10,7 @@ import zlib
import asyncio import asyncio
from collections.abc import Iterable from collections.abc import Iterable
from datetime import datetime, timedelta, timezone
from distutils.version import StrictVersion from distutils.version import StrictVersion
from enum import Enum from enum import Enum
from functools import partial from functools import partial
@ -500,7 +501,7 @@ class Job:
if result: if result:
try: try:
self._result = self.serializer.loads(obj.get('result')) self._result = self.serializer.loads(obj.get('result'))
except Exception as e: except Exception:
self._result = "Unserializable return value" self._result = "Unserializable return value"
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
@ -509,8 +510,8 @@ class Job:
dep_ids = obj.get('dependency_ids') dep_ids = obj.get('dependency_ids')
dep_id = obj.get('dependency_id') # for backwards compatibility dep_id = obj.get('dependency_id') # for backwards compatibility
self._dependency_ids = ( json.loads(dep_ids.decode()) if dep_ids self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids
else [dep_id.decode()] if dep_id else [] ) else [dep_id.decode()] if dep_id else [])
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
@ -808,6 +809,17 @@ class Job:
index = max(number_of_intervals - self.retries_left, 0) index = max(number_of_intervals - self.retries_left, 0)
return self.retry_intervals[index] return self.retry_intervals[index]
def retry(self, queue, pipeline):
"""Requeue or schedule this job for execution"""
retry_interval = self.get_retry_interval()
self.retries_left = self.retries_left - 1
if retry_interval:
scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval)
self.set_status(JobStatus.SCHEDULED)
queue.schedule_job(self, scheduled_datetime, pipeline=pipeline)
else:
queue.enqueue_job(self, pipeline=pipeline)
def register_dependency(self, pipeline=None): def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the jobs they """Jobs may have dependencies. Jobs are enqueued only if the jobs they
depend on are successfully performed. We record this relation as depend on are successfully performed. We record this relation as
@ -874,6 +886,7 @@ class Job:
if status if status
) )
_job_stack = LocalStack() _job_stack = LocalStack()

@ -137,13 +137,21 @@ class StartedJobRegistry(BaseRegistry):
try: try:
job = self.job_class.fetch(job_id, job = self.job_class.fetch(job_id,
connection=self.connection) connection=self.connection)
except NoSuchJobError:
continue
retry = job.retries_left and job.retries_left > 0
if retry:
queue = self.get_queue()
job.retry(queue, pipeline)
else:
job.set_status(JobStatus.FAILED) job.set_status(JobStatus.FAILED)
job.exc_info = "Moved to FailedJobRegistry at %s" % datetime.now() job.exc_info = "Moved to FailedJobRegistry at %s" % datetime.now()
job.save(pipeline=pipeline, include_meta=False) job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) job.cleanup(ttl=-1, pipeline=pipeline)
failed_job_registry.add(job, job.failure_ttl) failed_job_registry.add(job, job.failure_ttl)
except NoSuchJobError:
pass
pipeline.zremrangebyscore(self.key, 0, score) pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute() pipeline.execute()

@ -13,7 +13,7 @@ import time
import traceback import traceback
import warnings import warnings
from datetime import datetime, timedelta, timezone from datetime import timedelta
from distutils.version import StrictVersion from distutils.version import StrictVersion
from enum import Enum from enum import Enum
from uuid import uuid4 from uuid import uuid4
@ -295,7 +295,7 @@ class Worker:
now_in_string = utcformat(now) now_in_string = utcformat(now)
self.birth_date = now self.birth_date = now
mapping={ mapping = {
'birth': now_in_string, 'birth': now_in_string,
'last_heartbeat': now_in_string, 'last_heartbeat': now_in_string,
'queues': queues, 'queues': queues,
@ -673,7 +673,7 @@ class Worker:
pass pass
except redis.exceptions.ConnectionError as conn_err: except redis.exceptions.ConnectionError as conn_err:
self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...', self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...',
conn_err, connection_wait_time) conn_err, connection_wait_time)
time.sleep(connection_wait_time) time.sleep(connection_wait_time)
connection_wait_time *= self.exponential_backoff_factor connection_wait_time *= self.exponential_backoff_factor
connection_wait_time = min(connection_wait_time, self.max_connection_wait_time) connection_wait_time = min(connection_wait_time, self.max_connection_wait_time)
@ -760,7 +760,7 @@ class Worker:
if child_pid == 0: if child_pid == 0:
os.setsid() os.setsid()
self.main_work_horse(job, queue) self.main_work_horse(job, queue)
os._exit(0) # just in case os._exit(0) # just in case
else: else:
self._horse_pid = child_pid self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time())) self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
@ -842,7 +842,7 @@ class Worker:
self.handle_job_failure( self.handle_job_failure(
job, queue=queue, job, queue=queue,
exc_string="Work-horse was terminated unexpectedly " exc_string="Work-horse was terminated unexpectedly "
"(waitpid returned %s)" % ret_val "(waitpid returned %s)" % ret_val
) )
def execute_job(self, job, queue): def execute_job(self, job, queue):
@ -951,14 +951,7 @@ class Worker:
) )
if retry: if retry:
retry_interval = job.get_retry_interval() job.retry(queue, pipeline)
job.retries_left = job.retries_left - 1
if retry_interval:
scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval)
job.set_status(JobStatus.SCHEDULED)
queue.schedule_job(job, scheduled_datetime, pipeline=pipeline)
else:
queue.enqueue_job(job, pipeline=pipeline)
try: try:
pipeline.execute() pipeline.execute()
@ -1188,7 +1181,7 @@ class RoundRobinWorker(Worker):
""" """
def reorder_queues(self, reference_queue): def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue) pos = self._ordered_queues.index(reference_queue)
self._ordered_queues = self._ordered_queues[pos+1:] + self._ordered_queues[:pos+1] self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1]
class RandomWorker(Worker): class RandomWorker(Worker):

@ -1,6 +1,4 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import json import json
import time import time
@ -12,7 +10,7 @@ from redis import WatchError
from rq.compat import as_text from rq.compat import as_text
from rq.exceptions import NoSuchJobError from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus, cancel_job, get_current_job, Retry from rq.job import Job, JobStatus, cancel_job, get_current_job
from rq.queue import Queue from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry, from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry, FinishedJobRegistry, StartedJobRegistry,
@ -222,19 +220,6 @@ class TestJob(RQTestCase):
job.heartbeat(ts, 0) job.heartbeat(ts, 0)
self.assertEqual(job.last_heartbeat, ts) self.assertEqual(job.last_heartbeat, ts)
def test_persistence_of_retry_data(self):
"""Retry related data is stored and restored properly"""
job = Job.create(func=fixtures.some_calculation)
job.retries_left = 3
job.retry_intervals = [1, 2, 3]
job.save()
job.retries_left = None
job.retry_intervals = None
job.refresh()
self.assertEqual(job.retries_left, 3)
self.assertEqual(job.retry_intervals, [1, 2, 3])
def test_persistence_of_parent_job(self): def test_persistence_of_parent_job(self):
"""Storing jobs with parent job, either instance or key.""" """Storing jobs with parent job, either instance or key."""
parent_job = Job.create(func=fixtures.some_calculation) parent_job = Job.create(func=fixtures.some_calculation)
@ -610,7 +595,6 @@ class TestJob(RQTestCase):
def test_job_delete_removes_itself_from_registries(self): def test_job_delete_removes_itself_from_registries(self):
"""job.delete() should remove itself from job registries""" """job.delete() should remove itself from job registries"""
connection = self.testconn
job = Job.create(func=fixtures.say_hello, status=JobStatus.FAILED, job = Job.create(func=fixtures.say_hello, status=JobStatus.FAILED,
connection=self.testconn, origin='default') connection=self.testconn, origin='default')
job.save() job.save()
@ -836,7 +820,6 @@ class TestJob(RQTestCase):
self.assertListEqual(dependencies, [dependency_job]) self.assertListEqual(dependencies, [dependency_job])
def test_fetch_dependencies_returns_empty_if_not_dependent_job(self): def test_fetch_dependencies_returns_empty_if_not_dependent_job(self):
queue = Queue(connection=self.testconn)
dependent_job = Job.create(func=fixtures.say_hello) dependent_job = Job.create(func=fixtures.say_hello)
dependent_job.register_dependency() dependent_job.register_dependency()
@ -901,8 +884,6 @@ class TestJob(RQTestCase):
self.assertFalse(dependencies_finished) self.assertFalse(dependencies_finished)
def test_dependencies_finished_returns_true_if_no_dependencies(self): def test_dependencies_finished_returns_true_if_no_dependencies(self):
queue = Queue(connection=self.testconn)
dependent_job = Job.create(func=fixtures.say_hello) dependent_job = Job.create(func=fixtures.say_hello)
dependent_job.register_dependency() dependent_job.register_dependency()
@ -947,8 +928,6 @@ class TestJob(RQTestCase):
dependent_job._dependency_ids = [job.id for job in dependency_jobs] dependent_job._dependency_ids = [job.id for job in dependency_jobs]
dependent_job.register_dependency() dependent_job.register_dependency()
now = utcnow()
dependencies_finished = dependent_job.dependencies_are_met() dependencies_finished = dependent_job.dependencies_are_met()
self.assertFalse(dependencies_finished) self.assertFalse(dependencies_finished)
@ -1067,48 +1046,3 @@ class TestJob(RQTestCase):
self.assertEqual(queue.count, 0) self.assertEqual(queue.count, 0)
self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B])) self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B]))
self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"]) self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"])
def test_retry(self):
"""Retry parses `max` and `interval` correctly"""
retry = Retry(max=1)
self.assertEqual(retry.max, 1)
self.assertEqual(retry.intervals, [0])
self.assertRaises(ValueError, Retry, max=0)
retry = Retry(max=2, interval=5)
self.assertEqual(retry.max, 2)
self.assertEqual(retry.intervals, [5])
retry = Retry(max=3, interval=[5, 10])
self.assertEqual(retry.max, 3)
self.assertEqual(retry.intervals, [5, 10])
# interval can't be negative
self.assertRaises(ValueError, Retry, max=1, interval=-5)
self.assertRaises(ValueError, Retry, max=1, interval=[1, -5])
def test_get_retry_interval(self):
"""get_retry_interval() returns the right retry interval"""
job = Job.create(func=fixtures.say_hello)
# Handle case where self.retry_intervals is None
job.retries_left = 2
self.assertEqual(job.get_retry_interval(), 0)
# Handle the most common case
job.retry_intervals = [1, 2]
self.assertEqual(job.get_retry_interval(), 1)
job.retries_left = 1
self.assertEqual(job.get_retry_interval(), 2)
# Handle cases where number of retries > length of interval
job.retries_left = 4
job.retry_intervals = [1, 2, 3]
self.assertEqual(job.get_retry_interval(), 1)
job.retries_left = 3
self.assertEqual(job.get_retry_interval(), 1)
job.retries_left = 2
self.assertEqual(job.get_retry_interval(), 2)
job.retries_left = 1
self.assertEqual(job.get_retry_interval(), 3)

@ -114,7 +114,6 @@ class TestRegistry(RQTestCase):
self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) self.assertIsNone(self.testconn.zscore(self.registry.key, job.id))
self.assertFalse(self.testconn.exists(job.key)) self.assertFalse(self.testconn.exists(job.key))
def test_get_job_ids(self): def test_get_job_ids(self):
"""Getting job ids from StartedJobRegistry.""" """Getting job ids from StartedJobRegistry."""
timestamp = current_timestamp() timestamp = current_timestamp()

@ -0,0 +1,139 @@
from datetime import datetime, timedelta, timezone
from rq.job import Job, JobStatus, Retry
from rq.queue import Queue
from rq.registry import FailedJobRegistry, StartedJobRegistry
from rq.worker import Worker
from tests import RQTestCase, fixtures
from tests.fixtures import div_by_zero, say_hello
class TestRetry(RQTestCase):
def test_persistence_of_retry_data(self):
"""Retry related data is stored and restored properly"""
job = Job.create(func=fixtures.some_calculation)
job.retries_left = 3
job.retry_intervals = [1, 2, 3]
job.save()
job.retries_left = None
job.retry_intervals = None
job.refresh()
self.assertEqual(job.retries_left, 3)
self.assertEqual(job.retry_intervals, [1, 2, 3])
def test_retry_class(self):
"""Retry parses `max` and `interval` correctly"""
retry = Retry(max=1)
self.assertEqual(retry.max, 1)
self.assertEqual(retry.intervals, [0])
self.assertRaises(ValueError, Retry, max=0)
retry = Retry(max=2, interval=5)
self.assertEqual(retry.max, 2)
self.assertEqual(retry.intervals, [5])
retry = Retry(max=3, interval=[5, 10])
self.assertEqual(retry.max, 3)
self.assertEqual(retry.intervals, [5, 10])
# interval can't be negative
self.assertRaises(ValueError, Retry, max=1, interval=-5)
self.assertRaises(ValueError, Retry, max=1, interval=[1, -5])
def test_get_retry_interval(self):
"""get_retry_interval() returns the right retry interval"""
job = Job.create(func=fixtures.say_hello)
# Handle case where self.retry_intervals is None
job.retries_left = 2
self.assertEqual(job.get_retry_interval(), 0)
# Handle the most common case
job.retry_intervals = [1, 2]
self.assertEqual(job.get_retry_interval(), 1)
job.retries_left = 1
self.assertEqual(job.get_retry_interval(), 2)
# Handle cases where number of retries > length of interval
job.retries_left = 4
job.retry_intervals = [1, 2, 3]
self.assertEqual(job.get_retry_interval(), 1)
job.retries_left = 3
self.assertEqual(job.get_retry_interval(), 1)
job.retries_left = 2
self.assertEqual(job.get_retry_interval(), 2)
job.retries_left = 1
self.assertEqual(job.get_retry_interval(), 3)
def test_job_retry(self):
"""Test job.retry() works properly"""
queue = Queue(connection=self.testconn)
retry = Retry(max=3, interval=5)
job = queue.enqueue(div_by_zero, retry=retry)
with self.testconn.pipeline() as pipeline:
job.retry(queue, pipeline)
pipeline.execute()
self.assertEqual(job.retries_left, 2)
# status should be scheduled since it's retried with 5 seconds interval
self.assertEqual(job.get_status(), JobStatus.SCHEDULED)
retry = Retry(max=3)
job = queue.enqueue(div_by_zero, retry=retry)
with self.testconn.pipeline() as pipeline:
job.retry(queue, pipeline)
pipeline.execute()
self.assertEqual(job.retries_left, 2)
# status should be queued
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_retry_interval(self):
"""Retries with intervals are scheduled"""
connection = self.testconn
queue = Queue(connection=connection)
retry = Retry(max=1, interval=5)
job = queue.enqueue(div_by_zero, retry=retry)
worker = Worker([queue])
registry = queue.scheduled_job_registry
# If job if configured to retry with interval, it will be scheduled,
# not directly put back in the queue
queue.empty()
worker.handle_job_failure(job, queue)
job.refresh()
self.assertEqual(job.get_status(), JobStatus.SCHEDULED)
self.assertEqual(job.retries_left, 0)
self.assertEqual(len(registry), 1)
self.assertEqual(queue.job_ids, [])
# Scheduled time is roughly 5 seconds from now
scheduled_time = registry.get_scheduled_time(job)
now = datetime.now(timezone.utc)
self.assertTrue(now + timedelta(seconds=4) < scheduled_time < now + timedelta(seconds=6))
def test_cleanup_handles_retries(self):
"""Expired jobs should also be retried"""
queue = Queue(connection=self.testconn)
registry = StartedJobRegistry(connection=self.testconn)
failed_job_registry = FailedJobRegistry(connection=self.testconn)
job = queue.enqueue(say_hello, retry=Retry(max=1))
# Add job to StartedJobRegistry with past expiration time
self.testconn.zadd(registry.key, {job.id: 2})
registry.cleanup()
self.assertEqual(len(queue), 2)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertNotIn(job, failed_job_registry)
self.testconn.zadd(registry.key, {job.id: 2})
# Job goes to FailedJobRegistry because it's only retried once
registry.cleanup()
self.assertEqual(len(queue), 2)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertIn(job, failed_job_registry)

@ -41,6 +41,7 @@ from rq.version import VERSION
from rq.worker import HerokuWorker, WorkerStatus, RoundRobinWorker, RandomWorker from rq.worker import HerokuWorker, WorkerStatus, RoundRobinWorker, RandomWorker
from rq.serializers import JSONSerializer from rq.serializers import JSONSerializer
class CustomJob(Job): class CustomJob(Job):
pass pass
@ -442,29 +443,6 @@ class TestWorker(RQTestCase):
# If a job is no longer retries, it's put in FailedJobRegistry # If a job is no longer retries, it's put in FailedJobRegistry
self.assertTrue(job in registry) self.assertTrue(job in registry)
def test_retry_interval(self):
"""Retries with intervals are scheduled"""
connection = self.testconn
queue = Queue(connection=connection)
retry = Retry(max=1, interval=5)
job = queue.enqueue(div_by_zero, retry=retry)
worker = Worker([queue])
registry = queue.scheduled_job_registry
# If job if configured to retry with interval, it will be scheduled,
# not directly put back in the queue
queue.empty()
worker.handle_job_failure(job, queue)
job.refresh()
self.assertEqual(job.get_status(), JobStatus.SCHEDULED)
self.assertEqual(job.retries_left, 0)
self.assertEqual(len(registry), 1)
self.assertEqual(queue.job_ids, [])
# Scheduled time is roughly 5 seconds from now
scheduled_time = registry.get_scheduled_time(job)
now = datetime.now(timezone.utc)
self.assertTrue(now + timedelta(seconds=4) < scheduled_time < now + timedelta(seconds=6))
def test_total_working_time(self): def test_total_working_time(self):
"""worker.total_working_time is stored properly""" """worker.total_working_time is stored properly"""
queue = Queue() queue = Queue()

Loading…
Cancel
Save