From 5b5cfdf9ab3c6997a85578d014125849bcbc9290 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 15 May 2021 20:04:52 +0700 Subject: [PATCH] Jobs that get cleaned up should also be retried (#1467) --- rq/job.py | 19 +++++- rq/registry.py | 12 +++- rq/worker.py | 21 +++---- tests/test_job.py | 68 +------------------- tests/test_registry.py | 3 +- tests/test_retry.py | 139 +++++++++++++++++++++++++++++++++++++++++ tests/test_worker.py | 24 +------ 7 files changed, 175 insertions(+), 111 deletions(-) create mode 100644 tests/test_retry.py diff --git a/rq/job.py b/rq/job.py index 547f12b..d13de49 100644 --- a/rq/job.py +++ b/rq/job.py @@ -10,6 +10,7 @@ import zlib import asyncio from collections.abc import Iterable +from datetime import datetime, timedelta, timezone from distutils.version import StrictVersion from enum import Enum from functools import partial @@ -500,7 +501,7 @@ class Job: if result: try: self._result = self.serializer.loads(obj.get('result')) - except Exception as e: + except Exception: self._result = "Unserializable return value" self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa @@ -509,8 +510,8 @@ class Job: dep_ids = obj.get('dependency_ids') dep_id = obj.get('dependency_id') # for backwards compatibility - self._dependency_ids = ( json.loads(dep_ids.decode()) if dep_ids - else [dep_id.decode()] if dep_id else [] ) + self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids + else [dep_id.decode()] if dep_id else []) self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} @@ -808,6 +809,17 @@ class Job: index = max(number_of_intervals - self.retries_left, 0) return self.retry_intervals[index] + def retry(self, queue, pipeline): + """Requeue or schedule this job for execution""" + retry_interval = self.get_retry_interval() + self.retries_left = self.retries_left - 1 + if retry_interval: + scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval) + self.set_status(JobStatus.SCHEDULED) + queue.schedule_job(self, scheduled_datetime, pipeline=pipeline) + else: + queue.enqueue_job(self, pipeline=pipeline) + def register_dependency(self, pipeline=None): """Jobs may have dependencies. Jobs are enqueued only if the jobs they depend on are successfully performed. We record this relation as @@ -874,6 +886,7 @@ class Job: if status ) + _job_stack = LocalStack() diff --git a/rq/registry.py b/rq/registry.py index 96123b1..7c25cab 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -137,13 +137,21 @@ class StartedJobRegistry(BaseRegistry): try: job = self.job_class.fetch(job_id, connection=self.connection) + except NoSuchJobError: + continue + + retry = job.retries_left and job.retries_left > 0 + + if retry: + queue = self.get_queue() + job.retry(queue, pipeline) + + else: job.set_status(JobStatus.FAILED) job.exc_info = "Moved to FailedJobRegistry at %s" % datetime.now() job.save(pipeline=pipeline, include_meta=False) job.cleanup(ttl=-1, pipeline=pipeline) failed_job_registry.add(job, job.failure_ttl) - except NoSuchJobError: - pass pipeline.zremrangebyscore(self.key, 0, score) pipeline.execute() diff --git a/rq/worker.py b/rq/worker.py index 3d44ca8..7a1b94e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -13,7 +13,7 @@ import time import traceback import warnings -from datetime import datetime, timedelta, timezone +from datetime import timedelta from distutils.version import StrictVersion from enum import Enum from uuid import uuid4 @@ -295,7 +295,7 @@ class Worker: now_in_string = utcformat(now) self.birth_date = now - mapping={ + mapping = { 'birth': now_in_string, 'last_heartbeat': now_in_string, 'queues': queues, @@ -673,7 +673,7 @@ class Worker: pass except redis.exceptions.ConnectionError as conn_err: self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...', - conn_err, connection_wait_time) + conn_err, connection_wait_time) time.sleep(connection_wait_time) connection_wait_time *= self.exponential_backoff_factor connection_wait_time = min(connection_wait_time, self.max_connection_wait_time) @@ -760,7 +760,7 @@ class Worker: if child_pid == 0: os.setsid() self.main_work_horse(job, queue) - os._exit(0) # just in case + os._exit(0) # just in case else: self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time())) @@ -842,7 +842,7 @@ class Worker: self.handle_job_failure( job, queue=queue, exc_string="Work-horse was terminated unexpectedly " - "(waitpid returned %s)" % ret_val + "(waitpid returned %s)" % ret_val ) def execute_job(self, job, queue): @@ -951,14 +951,7 @@ class Worker: ) if retry: - retry_interval = job.get_retry_interval() - job.retries_left = job.retries_left - 1 - if retry_interval: - scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval) - job.set_status(JobStatus.SCHEDULED) - queue.schedule_job(job, scheduled_datetime, pipeline=pipeline) - else: - queue.enqueue_job(job, pipeline=pipeline) + job.retry(queue, pipeline) try: pipeline.execute() @@ -1188,7 +1181,7 @@ class RoundRobinWorker(Worker): """ def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos+1:] + self._ordered_queues[:pos+1] + self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1] class RandomWorker(Worker): diff --git a/tests/test_job.py b/tests/test_job.py index f50b03e..0d43d2b 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,6 +1,4 @@ # -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, print_function, - unicode_literals) import json import time @@ -12,7 +10,7 @@ from redis import WatchError from rq.compat import as_text from rq.exceptions import NoSuchJobError -from rq.job import Job, JobStatus, cancel_job, get_current_job, Retry +from rq.job import Job, JobStatus, cancel_job, get_current_job from rq.queue import Queue from rq.registry import (DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, @@ -222,19 +220,6 @@ class TestJob(RQTestCase): job.heartbeat(ts, 0) self.assertEqual(job.last_heartbeat, ts) - def test_persistence_of_retry_data(self): - """Retry related data is stored and restored properly""" - job = Job.create(func=fixtures.some_calculation) - job.retries_left = 3 - job.retry_intervals = [1, 2, 3] - job.save() - - job.retries_left = None - job.retry_intervals = None - job.refresh() - self.assertEqual(job.retries_left, 3) - self.assertEqual(job.retry_intervals, [1, 2, 3]) - def test_persistence_of_parent_job(self): """Storing jobs with parent job, either instance or key.""" parent_job = Job.create(func=fixtures.some_calculation) @@ -610,7 +595,6 @@ class TestJob(RQTestCase): def test_job_delete_removes_itself_from_registries(self): """job.delete() should remove itself from job registries""" - connection = self.testconn job = Job.create(func=fixtures.say_hello, status=JobStatus.FAILED, connection=self.testconn, origin='default') job.save() @@ -836,7 +820,6 @@ class TestJob(RQTestCase): self.assertListEqual(dependencies, [dependency_job]) def test_fetch_dependencies_returns_empty_if_not_dependent_job(self): - queue = Queue(connection=self.testconn) dependent_job = Job.create(func=fixtures.say_hello) dependent_job.register_dependency() @@ -901,8 +884,6 @@ class TestJob(RQTestCase): self.assertFalse(dependencies_finished) def test_dependencies_finished_returns_true_if_no_dependencies(self): - queue = Queue(connection=self.testconn) - dependent_job = Job.create(func=fixtures.say_hello) dependent_job.register_dependency() @@ -947,8 +928,6 @@ class TestJob(RQTestCase): dependent_job._dependency_ids = [job.id for job in dependency_jobs] dependent_job.register_dependency() - now = utcnow() - dependencies_finished = dependent_job.dependencies_are_met() self.assertFalse(dependencies_finished) @@ -1067,48 +1046,3 @@ class TestJob(RQTestCase): self.assertEqual(queue.count, 0) self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B])) self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"]) - - def test_retry(self): - """Retry parses `max` and `interval` correctly""" - retry = Retry(max=1) - self.assertEqual(retry.max, 1) - self.assertEqual(retry.intervals, [0]) - self.assertRaises(ValueError, Retry, max=0) - - retry = Retry(max=2, interval=5) - self.assertEqual(retry.max, 2) - self.assertEqual(retry.intervals, [5]) - - retry = Retry(max=3, interval=[5, 10]) - self.assertEqual(retry.max, 3) - self.assertEqual(retry.intervals, [5, 10]) - - # interval can't be negative - self.assertRaises(ValueError, Retry, max=1, interval=-5) - self.assertRaises(ValueError, Retry, max=1, interval=[1, -5]) - - def test_get_retry_interval(self): - """get_retry_interval() returns the right retry interval""" - job = Job.create(func=fixtures.say_hello) - - # Handle case where self.retry_intervals is None - job.retries_left = 2 - self.assertEqual(job.get_retry_interval(), 0) - - # Handle the most common case - job.retry_intervals = [1, 2] - self.assertEqual(job.get_retry_interval(), 1) - job.retries_left = 1 - self.assertEqual(job.get_retry_interval(), 2) - - # Handle cases where number of retries > length of interval - job.retries_left = 4 - job.retry_intervals = [1, 2, 3] - self.assertEqual(job.get_retry_interval(), 1) - job.retries_left = 3 - self.assertEqual(job.get_retry_interval(), 1) - job.retries_left = 2 - self.assertEqual(job.get_retry_interval(), 2) - job.retries_left = 1 - self.assertEqual(job.get_retry_interval(), 3) - diff --git a/tests/test_registry.py b/tests/test_registry.py index 83f80f4..57f3e1d 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -114,7 +114,6 @@ class TestRegistry(RQTestCase): self.assertIsNone(self.testconn.zscore(self.registry.key, job.id)) self.assertFalse(self.testconn.exists(job.key)) - def test_get_job_ids(self): """Getting job ids from StartedJobRegistry.""" timestamp = current_timestamp() @@ -152,7 +151,7 @@ class TestRegistry(RQTestCase): self.assertNotIn(job, self.registry) job.refresh() self.assertEqual(job.get_status(), JobStatus.FAILED) - self.assertTrue(job.exc_info) # explanation is written to exc_info + self.assertTrue(job.exc_info) # explanation is written to exc_info def test_job_execution(self): """Job is removed from StartedJobRegistry after execution.""" diff --git a/tests/test_retry.py b/tests/test_retry.py new file mode 100644 index 0000000..ad01b37 --- /dev/null +++ b/tests/test_retry.py @@ -0,0 +1,139 @@ +from datetime import datetime, timedelta, timezone + +from rq.job import Job, JobStatus, Retry +from rq.queue import Queue +from rq.registry import FailedJobRegistry, StartedJobRegistry +from rq.worker import Worker +from tests import RQTestCase, fixtures +from tests.fixtures import div_by_zero, say_hello + + +class TestRetry(RQTestCase): + + def test_persistence_of_retry_data(self): + """Retry related data is stored and restored properly""" + job = Job.create(func=fixtures.some_calculation) + job.retries_left = 3 + job.retry_intervals = [1, 2, 3] + job.save() + + job.retries_left = None + job.retry_intervals = None + job.refresh() + self.assertEqual(job.retries_left, 3) + self.assertEqual(job.retry_intervals, [1, 2, 3]) + + def test_retry_class(self): + """Retry parses `max` and `interval` correctly""" + retry = Retry(max=1) + self.assertEqual(retry.max, 1) + self.assertEqual(retry.intervals, [0]) + self.assertRaises(ValueError, Retry, max=0) + + retry = Retry(max=2, interval=5) + self.assertEqual(retry.max, 2) + self.assertEqual(retry.intervals, [5]) + + retry = Retry(max=3, interval=[5, 10]) + self.assertEqual(retry.max, 3) + self.assertEqual(retry.intervals, [5, 10]) + + # interval can't be negative + self.assertRaises(ValueError, Retry, max=1, interval=-5) + self.assertRaises(ValueError, Retry, max=1, interval=[1, -5]) + + def test_get_retry_interval(self): + """get_retry_interval() returns the right retry interval""" + job = Job.create(func=fixtures.say_hello) + + # Handle case where self.retry_intervals is None + job.retries_left = 2 + self.assertEqual(job.get_retry_interval(), 0) + + # Handle the most common case + job.retry_intervals = [1, 2] + self.assertEqual(job.get_retry_interval(), 1) + job.retries_left = 1 + self.assertEqual(job.get_retry_interval(), 2) + + # Handle cases where number of retries > length of interval + job.retries_left = 4 + job.retry_intervals = [1, 2, 3] + self.assertEqual(job.get_retry_interval(), 1) + job.retries_left = 3 + self.assertEqual(job.get_retry_interval(), 1) + job.retries_left = 2 + self.assertEqual(job.get_retry_interval(), 2) + job.retries_left = 1 + self.assertEqual(job.get_retry_interval(), 3) + + def test_job_retry(self): + """Test job.retry() works properly""" + queue = Queue(connection=self.testconn) + retry = Retry(max=3, interval=5) + job = queue.enqueue(div_by_zero, retry=retry) + + with self.testconn.pipeline() as pipeline: + job.retry(queue, pipeline) + pipeline.execute() + + self.assertEqual(job.retries_left, 2) + # status should be scheduled since it's retried with 5 seconds interval + self.assertEqual(job.get_status(), JobStatus.SCHEDULED) + + retry = Retry(max=3) + job = queue.enqueue(div_by_zero, retry=retry) + + with self.testconn.pipeline() as pipeline: + job.retry(queue, pipeline) + + pipeline.execute() + + self.assertEqual(job.retries_left, 2) + # status should be queued + self.assertEqual(job.get_status(), JobStatus.QUEUED) + + def test_retry_interval(self): + """Retries with intervals are scheduled""" + connection = self.testconn + queue = Queue(connection=connection) + retry = Retry(max=1, interval=5) + job = queue.enqueue(div_by_zero, retry=retry) + + worker = Worker([queue]) + registry = queue.scheduled_job_registry + # If job if configured to retry with interval, it will be scheduled, + # not directly put back in the queue + queue.empty() + worker.handle_job_failure(job, queue) + job.refresh() + self.assertEqual(job.get_status(), JobStatus.SCHEDULED) + self.assertEqual(job.retries_left, 0) + self.assertEqual(len(registry), 1) + self.assertEqual(queue.job_ids, []) + # Scheduled time is roughly 5 seconds from now + scheduled_time = registry.get_scheduled_time(job) + now = datetime.now(timezone.utc) + self.assertTrue(now + timedelta(seconds=4) < scheduled_time < now + timedelta(seconds=6)) + + def test_cleanup_handles_retries(self): + """Expired jobs should also be retried""" + queue = Queue(connection=self.testconn) + registry = StartedJobRegistry(connection=self.testconn) + failed_job_registry = FailedJobRegistry(connection=self.testconn) + job = queue.enqueue(say_hello, retry=Retry(max=1)) + + # Add job to StartedJobRegistry with past expiration time + self.testconn.zadd(registry.key, {job.id: 2}) + + registry.cleanup() + self.assertEqual(len(queue), 2) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + self.assertNotIn(job, failed_job_registry) + + self.testconn.zadd(registry.key, {job.id: 2}) + # Job goes to FailedJobRegistry because it's only retried once + registry.cleanup() + self.assertEqual(len(queue), 2) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertIn(job, failed_job_registry) diff --git a/tests/test_worker.py b/tests/test_worker.py index 1f11bfb..05d6fef 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -41,6 +41,7 @@ from rq.version import VERSION from rq.worker import HerokuWorker, WorkerStatus, RoundRobinWorker, RandomWorker from rq.serializers import JSONSerializer + class CustomJob(Job): pass @@ -442,29 +443,6 @@ class TestWorker(RQTestCase): # If a job is no longer retries, it's put in FailedJobRegistry self.assertTrue(job in registry) - def test_retry_interval(self): - """Retries with intervals are scheduled""" - connection = self.testconn - queue = Queue(connection=connection) - retry = Retry(max=1, interval=5) - job = queue.enqueue(div_by_zero, retry=retry) - - worker = Worker([queue]) - registry = queue.scheduled_job_registry - # If job if configured to retry with interval, it will be scheduled, - # not directly put back in the queue - queue.empty() - worker.handle_job_failure(job, queue) - job.refresh() - self.assertEqual(job.get_status(), JobStatus.SCHEDULED) - self.assertEqual(job.retries_left, 0) - self.assertEqual(len(registry), 1) - self.assertEqual(queue.job_ids, []) - # Scheduled time is roughly 5 seconds from now - scheduled_time = registry.get_scheduled_time(job) - now = datetime.now(timezone.utc) - self.assertTrue(now + timedelta(seconds=4) < scheduled_time < now + timedelta(seconds=6)) - def test_total_working_time(self): """worker.total_working_time is stored properly""" queue = Queue()