Job retry feature. Docs WIP (#1299)

* Initial implementation of Retry class

* Fixes job.refresh() under Python 3.5

* Remove the use of text_type in job.py

* Retry can be scheduled

* monitor_work_horse() should call handle_job_failure() with queue argument.

* Flake8 fixes

* Added docs for job retries
main
Selwin Ong 5 years ago committed by GitHub
parent 8a0d9f91ff
commit 49b156ecc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,6 +16,8 @@ navigation:
url: /docs/results/ url: /docs/results/
- text: Jobs - text: Jobs
url: /docs/jobs/ url: /docs/jobs/
- text: Exceptions
url: /docs/exceptions/
- text: Scheduling Jobs - text: Scheduling Jobs
url: /docs/scheduling/ url: /docs/scheduling/
- text: Monitoring - text: Monitoring
@ -24,8 +26,6 @@ navigation:
url: /docs/job_registries/ url: /docs/job_registries/
- text: Connections - text: Connections
url: /docs/connections/ url: /docs/connections/
- text: Exceptions
url: /docs/exceptions/
- text: Testing - text: Testing
url: /docs/testing/ url: /docs/testing/
- text: Patterns - text: Patterns

@ -1,17 +1,64 @@
--- ---
title: "RQ: Exceptions" title: "RQ: Exceptions & Retries"
layout: docs layout: docs
--- ---
Jobs can fail due to exceptions occurring. When your RQ workers run in the Jobs can fail due to exceptions occurring. When your RQ workers run in the
background, how do you get notified of these exceptions? background, how do you get notified of these exceptions?
## Default: the `FailedJobRegistry` ## Default: FailedJobRegistry
The default safety net for RQ is the `FailedJobRegistry`. Every job that doesn't The default safety net for RQ is the `FailedJobRegistry`. Every job that doesn't
execute successfully is stored here, along with its exception information (type, execute successfully is stored here, along with its exception information (type,
value, traceback). While this makes sure no failing jobs "get lost", this is value, traceback).
of no use to get notified pro-actively about job failure.
```python
from redis import Redis
from rq import Queue
from rq.registry import FailedJobRegistry
redis = Redis()
queue = Queue(connection=redis)
registry = FailedJobRegistry(queue=queue)
# Show all failed job IDs and the exceptions they caused during runtime
for job_id in registry.get_job_ids():
job = Job.fetch(job_id, connection=redis)
print(job_id, job.exc_info)
```
## Retrying Failed Jobs
_New in version 1.5.0_
RQ lets you easily retry failed jobs. To configure retries, use RQ's
`Retry` object that accepts `max` and `interval` arguments. For example:
```python
from redis import Redis
from rq import Retry, Queue
from somewhere import my_func
queue = Queue(connection=redis)
# Retry up to 3 times, failed job will be requeued immediately
queue.enqueue(my_func, retry=Retry(max=3))
# Retry up to 3 times, with 60 seconds interval in between executions
queue.enqueue(my_func, retry=Retry(max=3, interval=60))
# Retry up to 3 times, with longer interval in between retries
queue.enqueue(my_func, retry=Retry(max=3, interval=[10, 30, 60]))
```
<div class="warning">
<img style="float: right; margin-right: -60px; margin-top: -38px" src="/img/warning.png" />
<strong>Note:</strong>
<p>
If you use `interval` argument with `Retry`, don't forget to run your workers using
the `--with-scheduler` argument.
</p>
</div>
## Custom Exception Handlers ## Custom Exception Handlers

@ -20,7 +20,7 @@ RQ to have job scheduling capabilities without:
2. Worrying about a separate `Scheduler` class. 2. Worrying about a separate `Scheduler` class.
# Scheduling Jobs for Execution ## Scheduling Jobs for Execution
There are two main APIs to schedule jobs for execution, `enqueue_at()` and `enqueue_in()`. There are two main APIs to schedule jobs for execution, `enqueue_at()` and `enqueue_in()`.
@ -76,7 +76,7 @@ registry = ScheduledJobRegistry(queue=queue)
print(job in registry) # Outputs True as job is placed in ScheduledJobRegistry print(job in registry) # Outputs True as job is placed in ScheduledJobRegistry
``` ```
# Running the Scheduler ## Running the Scheduler
If you use RQ's scheduling features, you need to run RQ workers with the If you use RQ's scheduling features, you need to run RQ workers with the
scheduler component enabled. scheduler component enabled.

@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function,
from .connections import (Connection, get_current_connection, pop_connection, from .connections import (Connection, get_current_connection, pop_connection,
push_connection, use_connection) push_connection, use_connection)
from .job import cancel_job, get_current_job, requeue_job from .job import cancel_job, get_current_job, requeue_job, Retry
from .queue import Queue from .queue import Queue
from .version import VERSION from .version import VERSION
from .worker import SimpleWorker, Worker from .worker import SimpleWorker, Worker

@ -3,15 +3,17 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import inspect import inspect
import json
import pickle import pickle
import warnings import warnings
import zlib import zlib
from collections.abc import Iterable
from distutils.version import StrictVersion from distutils.version import StrictVersion
from functools import partial from functools import partial
from uuid import uuid4 from uuid import uuid4
from rq.compat import as_text, decode_redis_hash, string_types, text_type from rq.compat import as_text, decode_redis_hash, string_types
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError from .exceptions import NoSuchJobError
from .local import LocalStack from .local import LocalStack
@ -345,6 +347,9 @@ class Job(object):
self._dependency_ids = [] self._dependency_ids = []
self.meta = {} self.meta = {}
self.serializer = resolve_serializer(serializer) self.serializer = resolve_serializer(serializer)
self.retries_left = None
# retry_intervals is a list of int e.g [60, 120, 240]
self.retry_intervals = None
self.redis_server_version = None self.redis_server_version = None
def __repr__(self): # noqa # pragma: no cover def __repr__(self): # noqa # pragma: no cover
@ -370,7 +375,7 @@ class Job(object):
first time the ID is requested. first time the ID is requested.
""" """
if self._id is None: if self._id is None:
self._id = text_type(uuid4()) self._id = str(uuid4())
return self._id return self._id
def set_id(self, value): def set_id(self, value):
@ -481,7 +486,7 @@ class Job(object):
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = as_text(obj.get('status')) if obj.get('status') else None self._status = obj.get('status').decode() if obj.get('status') else None
dependency_id = obj.get('dependency_id', None) dependency_id = obj.get('dependency_id', None)
self._dependency_ids = [as_text(dependency_id)] if dependency_id else [] self._dependency_ids = [as_text(dependency_id)] if dependency_id else []
@ -489,6 +494,10 @@ class Job(object):
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
self.retries_left = int(obj.get('retries_left')) if obj.get('retries_left') else None
if obj.get('retry_intervals'):
self.retry_intervals = json.loads(obj.get('retry_intervals').decode())
raw_exc_info = obj.get('exc_info') raw_exc_info = obj.get('exc_info')
if raw_exc_info: if raw_exc_info:
try: try:
@ -516,10 +525,17 @@ class Job(object):
You can exclude serializing the `meta` dictionary by setting You can exclude serializing the `meta` dictionary by setting
`include_meta=False`. `include_meta=False`.
""" """
obj = {} obj = {
obj['created_at'] = utcformat(self.created_at or utcnow()) 'created_at': utcformat(self.created_at or utcnow()),
obj['data'] = zlib.compress(self.data) 'data': zlib.compress(self.data),
'started_at': utcformat(self.started_at) if self.started_at else '',
'ended_at': utcformat(self.ended_at) if self.ended_at else '',
}
if self.retries_left is not None:
obj['retries_left'] = self.retries_left
if self.retry_intervals is not None:
obj['retry_intervals'] = json.dumps(self.retry_intervals)
if self.origin is not None: if self.origin is not None:
obj['origin'] = self.origin obj['origin'] = self.origin
if self.description is not None: if self.description is not None:
@ -527,8 +543,6 @@ class Job(object):
if self.enqueued_at is not None: if self.enqueued_at is not None:
obj['enqueued_at'] = utcformat(self.enqueued_at) obj['enqueued_at'] = utcformat(self.enqueued_at)
obj['started_at'] = utcformat(self.started_at) if self.started_at else ''
obj['ended_at'] = utcformat(self.ended_at) if self.ended_at else ''
if self._result is not None: if self._result is not None:
try: try:
obj['result'] = self.serializer.dumps(self._result) obj['result'] = self.serializer.dumps(self._result)
@ -733,6 +747,17 @@ class Job(object):
return FailedJobRegistry(self.origin, connection=self.connection, return FailedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__) job_class=self.__class__)
def get_retry_interval(self):
"""Returns the desired retry interval.
If number of retries is bigger than length of intervals, the first
value in the list will be used multiple times.
"""
if self.retry_intervals is None:
return 0
number_of_intervals = len(self.retry_intervals)
index = max(number_of_intervals - self.retries_left, 0)
return self.retry_intervals[index]
def register_dependency(self, pipeline=None): def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the job they """Jobs may have dependencies. Jobs are enqueued only if the job they
depend on is successfully performed. We record this relation as depend on is successfully performed. We record this relation as
@ -800,3 +825,24 @@ class Job(object):
) )
_job_stack = LocalStack() _job_stack = LocalStack()
class Retry(object):
def __init__(self, max, interval=0):
"""`interval` can be a positive number or a list of ints"""
super().__init__()
if max < 1:
raise ValueError('max: please enter a value greater than 0')
if isinstance(interval, int):
if interval < 0:
raise ValueError('interval: negative numbers are not allowed')
intervals = [interval]
elif isinstance(interval, Iterable):
for i in interval:
if i < 0:
raise ValueError('interval: negative numbers are not allowed')
intervals = interval
self.max = max
self.intervals = intervals

@ -282,7 +282,7 @@ class Queue(object):
def create_job(self, func, args=None, kwargs=None, timeout=None, def create_job(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None, result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None, description=None, depends_on=None, job_id=None,
meta=None, status=JobStatus.QUEUED): meta=None, status=JobStatus.QUEUED, retry=None):
"""Creates a job based on parameters given.""" """Creates a job based on parameters given."""
timeout = parse_timeout(timeout) timeout = parse_timeout(timeout)
@ -306,12 +306,16 @@ class Queue(object):
origin=self.name, meta=meta, serializer=self.serializer origin=self.name, meta=meta, serializer=self.serializer
) )
if retry:
job.retries_left = retry.max
job.retry_intervals = retry.intervals
return job return job
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None, result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None, description=None, depends_on=None, job_id=None,
at_front=False, meta=None): at_front=False, meta=None, retry=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
nd nd
@ -324,6 +328,7 @@ nd
func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description, depends_on=depends_on, failure_ttl=failure_ttl, description=description, depends_on=depends_on,
job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
retry=retry
) )
# If a _dependent_ job depends on any unfinished job, register all the # If a _dependent_ job depends on any unfinished job, register all the
@ -397,6 +402,7 @@ nd
job_id = kwargs.pop('job_id', None) job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False) at_front = kwargs.pop('at_front', False)
meta = kwargs.pop('meta', None) meta = kwargs.pop('meta', None)
retry = kwargs.pop('retry', None)
if 'args' in kwargs or 'kwargs' in kwargs: if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa
@ -404,40 +410,46 @@ nd
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return (f, timeout, description, result_ttl, ttl, failure_ttl, return (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs) depends_on, job_id, at_front, meta, retry, args, kwargs)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues it.""" """Creates a job to represent the delayed function call and enqueues it."""
(f, timeout, description, result_ttl, ttl, failure_ttl, (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
return self.enqueue_call( return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout, func=f, args=args, kwargs=kwargs, timeout=timeout,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
description=description, depends_on=depends_on, job_id=job_id, description=description, depends_on=depends_on, job_id=job_id,
at_front=at_front, meta=meta at_front=at_front, meta=meta, retry=retry
) )
def enqueue_at(self, datetime, f, *args, **kwargs): def enqueue_at(self, datetime, f, *args, **kwargs):
"""Schedules a job to be enqueued at specified time""" """Schedules a job to be enqueued at specified time"""
from .registry import ScheduledJobRegistry
(f, timeout, description, result_ttl, ttl, failure_ttl, (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs) depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl, timeout=timeout, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description, failure_ttl=failure_ttl, description=description,
depends_on=depends_on, job_id=job_id, meta=meta) depends_on=depends_on, job_id=job_id, meta=meta)
return self.schedule_job(job, datetime)
def schedule_job(self, job, datetime, pipeline=None):
"""Puts job on ScheduledJobRegistry"""
from .registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(queue=self) registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline:
# Add Queue key set
pipeline.sadd(self.redis_queues_keys, self.key)
job.save(pipeline=pipeline)
registry.schedule(job, datetime, pipeline=pipeline)
pipeline.execute()
pipe = pipeline if pipeline is not None else self.connection.pipeline()
# Add Queue key set
pipe.sadd(self.redis_queues_keys, self.key)
job.save(pipeline=pipe)
registry.schedule(job, datetime, pipeline=pipe)
if pipeline is None:
pipe.execute()
return job return job
def enqueue_in(self, time_delta, func, *args, **kwargs): def enqueue_in(self, time_delta, func, *args, **kwargs):

@ -88,7 +88,6 @@ class RQScheduler(object):
# Always reset _scheduled_job_registries when acquiring locks # Always reset _scheduled_job_registries when acquiring locks
self._scheduled_job_registries = [] self._scheduled_job_registries = []
self._acquired_locks = self._acquired_locks.union(successful_locks) self._acquired_locks = self._acquired_locks.union(successful_locks)
self.lock_acquisition_time = datetime.now() self.lock_acquisition_time = datetime.now()
# If auto_start is requested and scheduler is not started, # If auto_start is requested and scheduler is not started,
@ -110,7 +109,7 @@ class RQScheduler(object):
) )
@classmethod @classmethod
def get_locking_key(self, name): def get_locking_key(cls, name):
"""Returns scheduler key for a given queue name""" """Returns scheduler key for a given queue name"""
return SCHEDULER_LOCKING_KEY_TEMPLATE % name return SCHEDULER_LOCKING_KEY_TEMPLATE % name

@ -229,7 +229,7 @@ def str_to_date(date_str):
if not date_str: if not date_str:
return return
else: else:
return utcparse(as_text(date_str)) return utcparse(date_str.decode())
def parse_timeout(timeout): def parse_timeout(timeout):

@ -12,7 +12,8 @@ import sys
import time import time
import traceback import traceback
import warnings import warnings
from datetime import timedelta
from datetime import datetime, timedelta, timezone
from distutils.version import StrictVersion from distutils.version import StrictVersion
from uuid import uuid4 from uuid import uuid4
@ -706,7 +707,7 @@ class Worker(object):
self._horse_pid = child_pid self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time())) self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
def monitor_work_horse(self, job): def monitor_work_horse(self, job, queue):
"""The worker will monitor the work horse and make sure that it """The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to either executes successfully or the status of the job is set to
failed failed
@ -759,7 +760,7 @@ class Worker(object):
).format(ret_val)) ).format(ret_val))
self.handle_job_failure( self.handle_job_failure(
job, job, queue=queue,
exc_string="Work-horse was terminated unexpectedly " exc_string="Work-horse was terminated unexpectedly "
"(waitpid returned %s)" % ret_val "(waitpid returned %s)" % ret_val
) )
@ -772,7 +773,7 @@ class Worker(object):
""" """
self.set_state(WorkerStatus.BUSY) self.set_state(WorkerStatus.BUSY)
self.fork_work_horse(job, queue) self.fork_work_horse(job, queue)
self.monitor_work_horse(job) self.monitor_work_horse(job, queue)
self.set_state(WorkerStatus.IDLE) self.set_state(WorkerStatus.IDLE)
def main_work_horse(self, job, queue): def main_work_horse(self, job, queue):
@ -826,7 +827,7 @@ class Worker(object):
msg = 'Processing {0} from {1} since {2}' msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time())) self.procline(msg.format(job.func_name, job.origin, time.time()))
def handle_job_failure(self, job, started_job_registry=None, def handle_job_failure(self, job, queue, started_job_registry=None,
exc_string=''): exc_string=''):
"""Handles the failure or an executing job by: """Handles the failure or an executing job by:
1. Setting the job status to failed 1. Setting the job status to failed
@ -842,23 +843,41 @@ class Worker(object):
self.connection, self.connection,
job_class=self.job_class job_class=self.job_class
) )
# Requeue/reschedule if retry is configured
if job.retries_left and job.retries_left > 0:
retry = True
retry_interval = job.get_retry_interval()
job.retries_left = job.retries_left - 1
else:
retry = False
job.set_status(JobStatus.FAILED, pipeline=pipeline) job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler: if not self.disable_default_exception_handler:
failed_job_registry = FailedJobRegistry(job.origin, job.connection, failed_job_registry = FailedJobRegistry(job.origin, job.connection,
job_class=self.job_class) job_class=self.job_class)
failed_job_registry.add(job, ttl=job.failure_ttl, failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline) exc_string=exc_string, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline) self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at: if job.started_at and job.ended_at:
self.increment_total_working_time( self.increment_total_working_time(
job.ended_at - job.started_at, job.ended_at - job.started_at, pipeline
pipeline
) )
if retry:
if retry_interval:
scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval)
job.set_status(JobStatus.SCHEDULED)
queue.schedule_job(job, scheduled_datetime, pipeline=pipeline)
else:
queue.enqueue_job(job, pipeline=pipeline)
try: try:
pipeline.execute() pipeline.execute()
except Exception: except Exception:
@ -930,7 +949,7 @@ class Worker(object):
exc_string = self._get_safe_exception_string( exc_string = self._get_safe_exception_string(
traceback.format_exception(*exc_info) traceback.format_exception(*exc_info)
) )
self.handle_job_failure(job=job, exc_string=exc_string, self.handle_job_failure(job=job, exc_string=exc_string, queue=queue,
started_job_registry=started_job_registry) started_job_registry=started_job_registry)
self.handle_exception(job, *exc_info) self.handle_exception(job, *exc_info)
return False return False

@ -12,7 +12,7 @@ from redis import WatchError
from rq.compat import PY2, as_text from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus, cancel_job, get_current_job from rq.job import Job, JobStatus, cancel_job, get_current_job, Retry
from rq.queue import Queue from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry, from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry, FinishedJobRegistry, StartedJobRegistry,
@ -224,6 +224,19 @@ class TestJob(RQTestCase):
sorted(self.testconn.hkeys(job.key)), sorted(self.testconn.hkeys(job.key)),
[b'created_at', b'data', b'description', b'ended_at', b'started_at']) [b'created_at', b'data', b'description', b'ended_at', b'started_at'])
def test_persistence_of_retry_data(self):
"""Retry related data is stored and restored properly"""
job = Job.create(func=fixtures.some_calculation)
job.retries_left = 3
job.retry_intervals = [1, 2, 3]
job.save()
job.retries_left = None
job.retry_intervals = None
job.refresh()
self.assertEqual(job.retries_left, 3)
self.assertEqual(job.retry_intervals, [1, 2, 3])
def test_persistence_of_parent_job(self): def test_persistence_of_parent_job(self):
"""Storing jobs with parent job, either instance or key.""" """Storing jobs with parent job, either instance or key."""
parent_job = Job.create(func=fixtures.some_calculation) parent_job = Job.create(func=fixtures.some_calculation)
@ -922,3 +935,48 @@ class TestJob(RQTestCase):
assert dependent_job.dependencies_are_met() assert dependent_job.dependencies_are_met()
assert dependent_job.get_status() == JobStatus.QUEUED assert dependent_job.get_status() == JobStatus.QUEUED
def test_retry(self):
"""Retry parses `max` and `interval` correctly"""
retry = Retry(max=1)
self.assertEqual(retry.max, 1)
self.assertEqual(retry.intervals, [0])
self.assertRaises(ValueError, Retry, max=0)
retry = Retry(max=2, interval=5)
self.assertEqual(retry.max, 2)
self.assertEqual(retry.intervals, [5])
retry = Retry(max=3, interval=[5, 10])
self.assertEqual(retry.max, 3)
self.assertEqual(retry.intervals, [5, 10])
# interval can't be negative
self.assertRaises(ValueError, Retry, max=1, interval=-5)
self.assertRaises(ValueError, Retry, max=1, interval=[1, -5])
def test_get_retry_interval(self):
"""get_retry_interval() returns the right retry interval"""
job = Job.create(func=fixtures.say_hello)
# Handle case where self.retry_intervals is None
job.retries_left = 2
self.assertEqual(job.get_retry_interval(), 0)
# Handle the most common case
job.retry_intervals = [1, 2]
self.assertEqual(job.get_retry_interval(), 1)
job.retries_left = 1
self.assertEqual(job.get_retry_interval(), 2)
# Handle cases where number of retries > length of interval
job.retries_left = 4
job.retry_intervals = [1, 2, 3]
self.assertEqual(job.get_retry_interval(), 1)
job.retries_left = 3
self.assertEqual(job.get_retry_interval(), 1)
job.retries_left = 2
self.assertEqual(job.get_retry_interval(), 2)
job.retries_left = 1
self.assertEqual(job.get_retry_interval(), 3)

@ -6,7 +6,7 @@ import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from mock.mock import patch from mock.mock import patch
from rq import Queue from rq import Retry, Queue
from rq.compat import utc from rq.compat import utc
from rq.exceptions import NoSuchJobError from rq.exceptions import NoSuchJobError
@ -657,6 +657,15 @@ class TestQueue(RQTestCase):
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue)) self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue)) self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
def test_enqueue_with_retry(self):
"""Enqueueing with retry_strategy works"""
queue = Queue('example', connection=self.testconn)
job = queue.enqueue(say_hello, retry=Retry(max=3, interval=5))
job = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.retries_left, 3)
self.assertEqual(job.retry_intervals, [5])
class TestJobScheduling(RQTestCase): class TestJobScheduling(RQTestCase):
def test_enqueue_at(self): def test_enqueue_at(self):

@ -411,7 +411,7 @@ class TestFailedJobRegistry(RQTestCase):
timestamp = current_timestamp() timestamp = current_timestamp()
job = q.enqueue(div_by_zero, failure_ttl=5) job = q.enqueue(div_by_zero, failure_ttl=5)
w.handle_job_failure(job) w.handle_job_failure(job, q)
# job is added to FailedJobRegistry with default failure ttl # job is added to FailedJobRegistry with default failure ttl
self.assertIn(job.id, registry.get_job_ids()) self.assertIn(job.id, registry.get_job_ids())
self.assertLess(self.testconn.zscore(registry.key, job.id), self.assertLess(self.testconn.zscore(registry.key, job.id),
@ -419,6 +419,6 @@ class TestFailedJobRegistry(RQTestCase):
# job is added to FailedJobRegistry with specified ttl # job is added to FailedJobRegistry with specified ttl
job = q.enqueue(div_by_zero, failure_ttl=5) job = q.enqueue(div_by_zero, failure_ttl=5)
w.handle_job_failure(job) w.handle_job_failure(job, q)
self.assertLess(self.testconn.zscore(registry.key, job.id), self.assertLess(self.testconn.zscore(registry.key, job.id),
timestamp + 7) timestamp + 7)

@ -11,7 +11,7 @@ import sys
import time import time
import zlib import zlib
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from multiprocessing import Process from multiprocessing import Process
from time import sleep from time import sleep
@ -30,7 +30,7 @@ from tests.fixtures import (
from rq import Queue, SimpleWorker, Worker, get_current_connection from rq import Queue, SimpleWorker, Worker, get_current_connection
from rq.compat import as_text, PY2 from rq.compat import as_text, PY2
from rq.job import Job, JobStatus from rq.job import Job, JobStatus, Retry
from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.suspension import resume, suspend from rq.suspension import resume, suspend
from rq.utils import utcnow from rq.utils import utcnow
@ -326,7 +326,7 @@ class TestWorker(RQTestCase):
registry = StartedJobRegistry(connection=worker.connection) registry = StartedJobRegistry(connection=worker.connection)
job.started_at = utcnow() job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75) job.ended_at = job.started_at + timedelta(seconds=0.75)
worker.handle_job_failure(job) worker.handle_job_failure(job, queue)
worker.handle_job_success(job, queue, registry) worker.handle_job_success(job, queue, registry)
worker.refresh() worker.refresh()
@ -334,7 +334,7 @@ class TestWorker(RQTestCase):
self.assertEqual(worker.successful_job_count, 1) self.assertEqual(worker.successful_job_count, 1)
self.assertEqual(worker.total_working_time, 1.5) # 1.5 seconds self.assertEqual(worker.total_working_time, 1.5) # 1.5 seconds
worker.handle_job_failure(job) worker.handle_job_failure(job, queue)
worker.handle_job_success(job, queue, registry) worker.handle_job_success(job, queue, registry)
worker.refresh() worker.refresh()
@ -342,6 +342,60 @@ class TestWorker(RQTestCase):
self.assertEqual(worker.successful_job_count, 2) self.assertEqual(worker.successful_job_count, 2)
self.assertEqual(worker.total_working_time, 3.0) self.assertEqual(worker.total_working_time, 3.0)
def test_handle_retry(self):
"""handle_job_failure() handles retry properly"""
connection = self.testconn
queue = Queue(connection=connection)
retry = Retry(max=2)
job = queue.enqueue(div_by_zero, retry=retry)
worker = Worker([queue])
# If job if configured to retry, it will be put back in the queue
# This is the original execution
queue.empty()
worker.handle_job_failure(job, queue)
job.refresh()
self.assertEqual(job.retries_left, 1)
self.assertEqual([job.id], queue.job_ids)
# First retry
queue.empty()
worker.handle_job_failure(job, queue)
job.refresh()
self.assertEqual(job.retries_left, 0)
self.assertEqual([job.id], queue.job_ids)
# Second retry
queue.empty()
worker.handle_job_failure(job, queue)
job.refresh()
self.assertEqual(job.retries_left, 0)
self.assertEqual([], queue.job_ids)
def test_retry_interval(self):
"""Retries with intervals are scheduled"""
connection = self.testconn
queue = Queue(connection=connection)
retry = Retry(max=1, interval=5)
job = queue.enqueue(div_by_zero, retry=retry)
worker = Worker([queue])
registry = queue.scheduled_job_registry
# If job if configured to retry with interval, it will be scheduled,
# not directly put back in the queue
queue.empty()
worker.handle_job_failure(job, queue)
job.refresh()
self.assertEqual(job.get_status(), JobStatus.SCHEDULED)
self.assertEqual(job.retries_left, 0)
self.assertEqual(len(registry), 1)
self.assertEqual(queue.job_ids, [])
# Scheduled time is roughly 5 seconds from now
scheduled_time = registry.get_scheduled_time(job)
now = datetime.now(timezone.utc)
self.assertTrue(now + timedelta(seconds=4) < scheduled_time < now + timedelta(seconds=6))
def test_total_working_time(self): def test_total_working_time(self):
"""worker.total_working_time is stored properly""" """worker.total_working_time is stored properly"""
queue = Queue() queue = Queue()
@ -1056,7 +1110,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w.fork_work_horse(job, queue) w.fork_work_horse(job, queue)
p = Process(target=wait_and_kill_work_horse, args=(w._horse_pid, 0.5)) p = Process(target=wait_and_kill_work_horse, args=(w._horse_pid, 0.5))
p.start() p.start()
w.monitor_work_horse(job) w.monitor_work_horse(job, queue)
job_status = job.get_status() job_status = job.get_status()
p.join(1) p.join(1)
self.assertEqual(job_status, JobStatus.FAILED) self.assertEqual(job_status, JobStatus.FAILED)
@ -1082,7 +1136,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
job.timeout = 5 job.timeout = 5
w.job_monitoring_interval = 1 w.job_monitoring_interval = 1
now = utcnow() now = utcnow()
w.monitor_work_horse(job) w.monitor_work_horse(job, queue)
fudge_factor = 1 fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor total_time = w.job_monitoring_interval + 65 + fudge_factor
self.assertTrue((utcnow() - now).total_seconds() < total_time) self.assertTrue((utcnow() - now).total_seconds() < total_time)

Loading…
Cancel
Save