diff --git a/docs/_config.yml b/docs/_config.yml
index 11eecb6..e97aba3 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -16,6 +16,8 @@ navigation:
url: /docs/results/
- text: Jobs
url: /docs/jobs/
+ - text: Exceptions
+ url: /docs/exceptions/
- text: Scheduling Jobs
url: /docs/scheduling/
- text: Monitoring
@@ -23,9 +25,7 @@ navigation:
- text: Job Registries
url: /docs/job_registries/
- text: Connections
- url: /docs/connections/
- - text: Exceptions
- url: /docs/exceptions/
+ url: /docs/connections/
- text: Testing
url: /docs/testing/
- text: Patterns
diff --git a/docs/docs/exceptions.md b/docs/docs/exceptions.md
index d9b1827..2510ebf 100644
--- a/docs/docs/exceptions.md
+++ b/docs/docs/exceptions.md
@@ -1,17 +1,64 @@
---
-title: "RQ: Exceptions"
+title: "RQ: Exceptions & Retries"
layout: docs
---
-Jobs can fail due to exceptions occurring. When your RQ workers run in the
+Jobs can fail due to exceptions occurring. When your RQ workers run in the
background, how do you get notified of these exceptions?
-## Default: the `FailedJobRegistry`
+## Default: FailedJobRegistry
The default safety net for RQ is the `FailedJobRegistry`. Every job that doesn't
execute successfully is stored here, along with its exception information (type,
-value, traceback). While this makes sure no failing jobs "get lost", this is
-of no use to get notified pro-actively about job failure.
+value, traceback).
+
+```python
+from redis import Redis
+from rq import Queue
+from rq.registry import FailedJobRegistry
+
+redis = Redis()
+queue = Queue(connection=redis)
+registry = FailedJobRegistry(queue=queue)
+
+# Show all failed job IDs and the exceptions they caused during runtime
+for job_id in registry.get_job_ids():
+ job = Job.fetch(job_id, connection=redis)
+ print(job_id, job.exc_info)
+```
+
+## Retrying Failed Jobs
+
+_New in version 1.5.0_
+
+RQ lets you easily retry failed jobs. To configure retries, use RQ's
+`Retry` object that accepts `max` and `interval` arguments. For example:
+
+```python
+from redis import Redis
+from rq import Retry, Queue
+
+from somewhere import my_func
+
+queue = Queue(connection=redis)
+# Retry up to 3 times, failed job will be requeued immediately
+queue.enqueue(my_func, retry=Retry(max=3))
+
+# Retry up to 3 times, with 60 seconds interval in between executions
+queue.enqueue(my_func, retry=Retry(max=3, interval=60))
+
+# Retry up to 3 times, with longer interval in between retries
+queue.enqueue(my_func, retry=Retry(max=3, interval=[10, 30, 60]))
+```
+
+
+
+
Note:
+
+ If you use `interval` argument with `Retry`, don't forget to run your workers using
+ the `--with-scheduler` argument.
+
+
## Custom Exception Handlers
diff --git a/docs/docs/scheduling.md b/docs/docs/scheduling.md
index 7ecff03..7d27ba9 100644
--- a/docs/docs/scheduling.md
+++ b/docs/docs/scheduling.md
@@ -20,7 +20,7 @@ RQ to have job scheduling capabilities without:
2. Worrying about a separate `Scheduler` class.
-# Scheduling Jobs for Execution
+## Scheduling Jobs for Execution
There are two main APIs to schedule jobs for execution, `enqueue_at()` and `enqueue_in()`.
@@ -76,7 +76,7 @@ registry = ScheduledJobRegistry(queue=queue)
print(job in registry) # Outputs True as job is placed in ScheduledJobRegistry
```
-# Running the Scheduler
+## Running the Scheduler
If you use RQ's scheduling features, you need to run RQ workers with the
scheduler component enabled.
diff --git a/rq/__init__.py b/rq/__init__.py
index 9ad8be1..4ee6fc5 100644
--- a/rq/__init__.py
+++ b/rq/__init__.py
@@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function,
from .connections import (Connection, get_current_connection, pop_connection,
push_connection, use_connection)
-from .job import cancel_job, get_current_job, requeue_job
+from .job import cancel_job, get_current_job, requeue_job, Retry
from .queue import Queue
from .version import VERSION
from .worker import SimpleWorker, Worker
diff --git a/rq/job.py b/rq/job.py
index 5ba66a4..a27cf5d 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -3,15 +3,17 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import inspect
+import json
import pickle
import warnings
import zlib
+from collections.abc import Iterable
from distutils.version import StrictVersion
from functools import partial
from uuid import uuid4
-from rq.compat import as_text, decode_redis_hash, string_types, text_type
+from rq.compat import as_text, decode_redis_hash, string_types
from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .local import LocalStack
@@ -342,9 +344,12 @@ class Job(object):
self.failure_ttl = None
self.ttl = None
self._status = None
- self._dependency_ids = []
+ self._dependency_ids = []
self.meta = {}
self.serializer = resolve_serializer(serializer)
+ self.retries_left = None
+ # retry_intervals is a list of int e.g [60, 120, 240]
+ self.retry_intervals = None
self.redis_server_version = None
def __repr__(self): # noqa # pragma: no cover
@@ -370,7 +375,7 @@ class Job(object):
first time the ID is requested.
"""
if self._id is None:
- self._id = text_type(uuid4())
+ self._id = str(uuid4())
return self._id
def set_id(self, value):
@@ -481,13 +486,17 @@ class Job(object):
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
- self._status = as_text(obj.get('status')) if obj.get('status') else None
+ self._status = obj.get('status').decode() if obj.get('status') else None
dependency_id = obj.get('dependency_id', None)
self._dependency_ids = [as_text(dependency_id)] if dependency_id else []
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
+
+ self.retries_left = int(obj.get('retries_left')) if obj.get('retries_left') else None
+ if obj.get('retry_intervals'):
+ self.retry_intervals = json.loads(obj.get('retry_intervals').decode())
raw_exc_info = obj.get('exc_info')
if raw_exc_info:
@@ -516,10 +525,17 @@ class Job(object):
You can exclude serializing the `meta` dictionary by setting
`include_meta=False`.
"""
- obj = {}
- obj['created_at'] = utcformat(self.created_at or utcnow())
- obj['data'] = zlib.compress(self.data)
-
+ obj = {
+ 'created_at': utcformat(self.created_at or utcnow()),
+ 'data': zlib.compress(self.data),
+ 'started_at': utcformat(self.started_at) if self.started_at else '',
+ 'ended_at': utcformat(self.ended_at) if self.ended_at else '',
+ }
+
+ if self.retries_left is not None:
+ obj['retries_left'] = self.retries_left
+ if self.retry_intervals is not None:
+ obj['retry_intervals'] = json.dumps(self.retry_intervals)
if self.origin is not None:
obj['origin'] = self.origin
if self.description is not None:
@@ -527,8 +543,6 @@ class Job(object):
if self.enqueued_at is not None:
obj['enqueued_at'] = utcformat(self.enqueued_at)
- obj['started_at'] = utcformat(self.started_at) if self.started_at else ''
- obj['ended_at'] = utcformat(self.ended_at) if self.ended_at else ''
if self._result is not None:
try:
obj['result'] = self.serializer.dumps(self._result)
@@ -732,6 +746,17 @@ class Job(object):
from .registry import FailedJobRegistry
return FailedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__)
+
+ def get_retry_interval(self):
+ """Returns the desired retry interval.
+ If number of retries is bigger than length of intervals, the first
+ value in the list will be used multiple times.
+ """
+ if self.retry_intervals is None:
+ return 0
+ number_of_intervals = len(self.retry_intervals)
+ index = max(number_of_intervals - self.retries_left, 0)
+ return self.retry_intervals[index]
def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the job they
@@ -800,3 +825,24 @@ class Job(object):
)
_job_stack = LocalStack()
+
+
+class Retry(object):
+ def __init__(self, max, interval=0):
+ """`interval` can be a positive number or a list of ints"""
+ super().__init__()
+ if max < 1:
+ raise ValueError('max: please enter a value greater than 0')
+
+ if isinstance(interval, int):
+ if interval < 0:
+ raise ValueError('interval: negative numbers are not allowed')
+ intervals = [interval]
+ elif isinstance(interval, Iterable):
+ for i in interval:
+ if i < 0:
+ raise ValueError('interval: negative numbers are not allowed')
+ intervals = interval
+
+ self.max = max
+ self.intervals = intervals
diff --git a/rq/queue.py b/rq/queue.py
index 25fa95c..99ef158 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -282,7 +282,7 @@ class Queue(object):
def create_job(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
- meta=None, status=JobStatus.QUEUED):
+ meta=None, status=JobStatus.QUEUED, retry=None):
"""Creates a job based on parameters given."""
timeout = parse_timeout(timeout)
@@ -306,12 +306,16 @@ class Queue(object):
origin=self.name, meta=meta, serializer=self.serializer
)
+ if retry:
+ job.retries_left = retry.max
+ job.retry_intervals = retry.intervals
+
return job
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
- at_front=False, meta=None):
+ at_front=False, meta=None, retry=None):
"""Creates a job to represent the delayed function call and enqueues
it.
nd
@@ -324,6 +328,7 @@ nd
func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description, depends_on=depends_on,
job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
+ retry=retry
)
# If a _dependent_ job depends on any unfinished job, register all the
@@ -397,6 +402,7 @@ nd
job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False)
meta = kwargs.pop('meta', None)
+ retry = kwargs.pop('retry', None)
if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa
@@ -404,40 +410,46 @@ nd
kwargs = kwargs.pop('kwargs', None)
return (f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, args, kwargs)
+ depends_on, job_id, at_front, meta, retry, args, kwargs)
def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues it."""
(f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
+ depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
description=description, depends_on=depends_on, job_id=job_id,
- at_front=at_front, meta=meta
+ at_front=at_front, meta=meta, retry=retry
)
def enqueue_at(self, datetime, f, *args, **kwargs):
"""Schedules a job to be enqueued at specified time"""
- from .registry import ScheduledJobRegistry
(f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
+ depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description,
depends_on=depends_on, job_id=job_id, meta=meta)
+ return self.schedule_job(job, datetime)
+
+ def schedule_job(self, job, datetime, pipeline=None):
+ """Puts job on ScheduledJobRegistry"""
+ from .registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(queue=self)
- with self.connection.pipeline() as pipeline:
- # Add Queue key set
- pipeline.sadd(self.redis_queues_keys, self.key)
- job.save(pipeline=pipeline)
- registry.schedule(job, datetime, pipeline=pipeline)
- pipeline.execute()
+ pipe = pipeline if pipeline is not None else self.connection.pipeline()
+
+ # Add Queue key set
+ pipe.sadd(self.redis_queues_keys, self.key)
+ job.save(pipeline=pipe)
+ registry.schedule(job, datetime, pipeline=pipe)
+ if pipeline is None:
+ pipe.execute()
return job
def enqueue_in(self, time_delta, func, *args, **kwargs):
diff --git a/rq/scheduler.py b/rq/scheduler.py
index 4d2312f..e8d66e1 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -51,7 +51,7 @@ class RQScheduler(object):
self._stop_requested = False
self._status = self.Status.STOPPED
self._process = None
-
+
@property
def connection(self):
if self._connection:
@@ -87,8 +87,7 @@ class RQScheduler(object):
# Always reset _scheduled_job_registries when acquiring locks
self._scheduled_job_registries = []
- self._acquired_locks = self._acquired_locks.union(successful_locks)
-
+ self._acquired_locks = self._acquired_locks.union(successful_locks)
self.lock_acquisition_time = datetime.now()
# If auto_start is requested and scheduler is not started,
@@ -110,7 +109,7 @@ class RQScheduler(object):
)
@classmethod
- def get_locking_key(self, name):
+ def get_locking_key(cls, name):
"""Returns scheduler key for a given queue name"""
return SCHEDULER_LOCKING_KEY_TEMPLATE % name
@@ -172,7 +171,7 @@ class RQScheduler(object):
keys = [self.get_locking_key(name) for name in self._queue_names]
self.connection.delete(*keys)
self._status = self.Status.STOPPED
-
+
def start(self):
self._status = self.Status.STARTED
# Redis instance can't be pickled across processes so we need to
diff --git a/rq/utils.py b/rq/utils.py
index 0058ce4..4a22880 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -229,7 +229,7 @@ def str_to_date(date_str):
if not date_str:
return
else:
- return utcparse(as_text(date_str))
+ return utcparse(date_str.decode())
def parse_timeout(timeout):
diff --git a/rq/worker.py b/rq/worker.py
index dd9c0fc..463a00b 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -12,7 +12,8 @@ import sys
import time
import traceback
import warnings
-from datetime import timedelta
+
+from datetime import datetime, timedelta, timezone
from distutils.version import StrictVersion
from uuid import uuid4
@@ -706,7 +707,7 @@ class Worker(object):
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
- def monitor_work_horse(self, job):
+ def monitor_work_horse(self, job, queue):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed
@@ -759,7 +760,7 @@ class Worker(object):
).format(ret_val))
self.handle_job_failure(
- job,
+ job, queue=queue,
exc_string="Work-horse was terminated unexpectedly "
"(waitpid returned %s)" % ret_val
)
@@ -772,7 +773,7 @@ class Worker(object):
"""
self.set_state(WorkerStatus.BUSY)
self.fork_work_horse(job, queue)
- self.monitor_work_horse(job)
+ self.monitor_work_horse(job, queue)
self.set_state(WorkerStatus.IDLE)
def main_work_horse(self, job, queue):
@@ -826,7 +827,7 @@ class Worker(object):
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
- def handle_job_failure(self, job, started_job_registry=None,
+ def handle_job_failure(self, job, queue, started_job_registry=None,
exc_string=''):
"""Handles the failure or an executing job by:
1. Setting the job status to failed
@@ -842,22 +843,40 @@ class Worker(object):
self.connection,
job_class=self.job_class
)
- job.set_status(JobStatus.FAILED, pipeline=pipeline)
+
+ # Requeue/reschedule if retry is configured
+ if job.retries_left and job.retries_left > 0:
+ retry = True
+ retry_interval = job.get_retry_interval()
+ job.retries_left = job.retries_left - 1
+ else:
+ retry = False
+ job.set_status(JobStatus.FAILED, pipeline=pipeline)
+
started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler:
+
failed_job_registry = FailedJobRegistry(job.origin, job.connection,
job_class=self.job_class)
failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline)
+
self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at:
self.increment_total_working_time(
- job.ended_at - job.started_at,
- pipeline
+ job.ended_at - job.started_at, pipeline
)
+
+ if retry:
+ if retry_interval:
+ scheduled_datetime = datetime.now(timezone.utc) + timedelta(seconds=retry_interval)
+ job.set_status(JobStatus.SCHEDULED)
+ queue.schedule_job(job, scheduled_datetime, pipeline=pipeline)
+ else:
+ queue.enqueue_job(job, pipeline=pipeline)
try:
pipeline.execute()
@@ -930,7 +949,7 @@ class Worker(object):
exc_string = self._get_safe_exception_string(
traceback.format_exception(*exc_info)
)
- self.handle_job_failure(job=job, exc_string=exc_string,
+ self.handle_job_failure(job=job, exc_string=exc_string, queue=queue,
started_job_registry=started_job_registry)
self.handle_exception(job, *exc_info)
return False
diff --git a/tests/test_job.py b/tests/test_job.py
index a7cb7ce..a993c60 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -12,7 +12,7 @@ from redis import WatchError
from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError
-from rq.job import Job, JobStatus, cancel_job, get_current_job
+from rq.job import Job, JobStatus, cancel_job, get_current_job, Retry
from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry,
@@ -223,6 +223,19 @@ class TestJob(RQTestCase):
self.assertEqual(
sorted(self.testconn.hkeys(job.key)),
[b'created_at', b'data', b'description', b'ended_at', b'started_at'])
+
+ def test_persistence_of_retry_data(self):
+ """Retry related data is stored and restored properly"""
+ job = Job.create(func=fixtures.some_calculation)
+ job.retries_left = 3
+ job.retry_intervals = [1, 2, 3]
+ job.save()
+
+ job.retries_left = None
+ job.retry_intervals = None
+ job.refresh()
+ self.assertEqual(job.retries_left, 3)
+ self.assertEqual(job.retry_intervals, [1, 2, 3])
def test_persistence_of_parent_job(self):
"""Storing jobs with parent job, either instance or key."""
@@ -922,3 +935,48 @@ class TestJob(RQTestCase):
assert dependent_job.dependencies_are_met()
assert dependent_job.get_status() == JobStatus.QUEUED
+
+ def test_retry(self):
+ """Retry parses `max` and `interval` correctly"""
+ retry = Retry(max=1)
+ self.assertEqual(retry.max, 1)
+ self.assertEqual(retry.intervals, [0])
+ self.assertRaises(ValueError, Retry, max=0)
+
+ retry = Retry(max=2, interval=5)
+ self.assertEqual(retry.max, 2)
+ self.assertEqual(retry.intervals, [5])
+
+ retry = Retry(max=3, interval=[5, 10])
+ self.assertEqual(retry.max, 3)
+ self.assertEqual(retry.intervals, [5, 10])
+
+ # interval can't be negative
+ self.assertRaises(ValueError, Retry, max=1, interval=-5)
+ self.assertRaises(ValueError, Retry, max=1, interval=[1, -5])
+
+ def test_get_retry_interval(self):
+ """get_retry_interval() returns the right retry interval"""
+ job = Job.create(func=fixtures.say_hello)
+
+ # Handle case where self.retry_intervals is None
+ job.retries_left = 2
+ self.assertEqual(job.get_retry_interval(), 0)
+
+ # Handle the most common case
+ job.retry_intervals = [1, 2]
+ self.assertEqual(job.get_retry_interval(), 1)
+ job.retries_left = 1
+ self.assertEqual(job.get_retry_interval(), 2)
+
+ # Handle cases where number of retries > length of interval
+ job.retries_left = 4
+ job.retry_intervals = [1, 2, 3]
+ self.assertEqual(job.get_retry_interval(), 1)
+ job.retries_left = 3
+ self.assertEqual(job.get_retry_interval(), 1)
+ job.retries_left = 2
+ self.assertEqual(job.get_retry_interval(), 2)
+ job.retries_left = 1
+ self.assertEqual(job.get_retry_interval(), 3)
+
\ No newline at end of file
diff --git a/tests/test_queue.py b/tests/test_queue.py
index f438854..d936d3f 100644
--- a/tests/test_queue.py
+++ b/tests/test_queue.py
@@ -6,7 +6,7 @@ import json
from datetime import datetime, timedelta
from mock.mock import patch
-from rq import Queue
+from rq import Retry, Queue
from rq.compat import utc
from rq.exceptions import NoSuchJobError
@@ -656,6 +656,15 @@ class TestQueue(RQTestCase):
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
+
+ def test_enqueue_with_retry(self):
+ """Enqueueing with retry_strategy works"""
+ queue = Queue('example', connection=self.testconn)
+ job = queue.enqueue(say_hello, retry=Retry(max=3, interval=5))
+
+ job = Job.fetch(job.id, connection=self.testconn)
+ self.assertEqual(job.retries_left, 3)
+ self.assertEqual(job.retry_intervals, [5])
class TestJobScheduling(RQTestCase):
diff --git a/tests/test_registry.py b/tests/test_registry.py
index cc1f4e7..83f80f4 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -411,7 +411,7 @@ class TestFailedJobRegistry(RQTestCase):
timestamp = current_timestamp()
job = q.enqueue(div_by_zero, failure_ttl=5)
- w.handle_job_failure(job)
+ w.handle_job_failure(job, q)
# job is added to FailedJobRegistry with default failure ttl
self.assertIn(job.id, registry.get_job_ids())
self.assertLess(self.testconn.zscore(registry.key, job.id),
@@ -419,6 +419,6 @@ class TestFailedJobRegistry(RQTestCase):
# job is added to FailedJobRegistry with specified ttl
job = q.enqueue(div_by_zero, failure_ttl=5)
- w.handle_job_failure(job)
+ w.handle_job_failure(job, q)
self.assertLess(self.testconn.zscore(registry.key, job.id),
timestamp + 7)
diff --git a/tests/test_worker.py b/tests/test_worker.py
index df902ac..931c1fa 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -11,7 +11,7 @@ import sys
import time
import zlib
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from multiprocessing import Process
from time import sleep
@@ -30,7 +30,7 @@ from tests.fixtures import (
from rq import Queue, SimpleWorker, Worker, get_current_connection
from rq.compat import as_text, PY2
-from rq.job import Job, JobStatus
+from rq.job import Job, JobStatus, Retry
from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.suspension import resume, suspend
from rq.utils import utcnow
@@ -326,7 +326,7 @@ class TestWorker(RQTestCase):
registry = StartedJobRegistry(connection=worker.connection)
job.started_at = utcnow()
job.ended_at = job.started_at + timedelta(seconds=0.75)
- worker.handle_job_failure(job)
+ worker.handle_job_failure(job, queue)
worker.handle_job_success(job, queue, registry)
worker.refresh()
@@ -334,13 +334,67 @@ class TestWorker(RQTestCase):
self.assertEqual(worker.successful_job_count, 1)
self.assertEqual(worker.total_working_time, 1.5) # 1.5 seconds
- worker.handle_job_failure(job)
+ worker.handle_job_failure(job, queue)
worker.handle_job_success(job, queue, registry)
worker.refresh()
self.assertEqual(worker.failed_job_count, 2)
self.assertEqual(worker.successful_job_count, 2)
self.assertEqual(worker.total_working_time, 3.0)
+
+ def test_handle_retry(self):
+ """handle_job_failure() handles retry properly"""
+ connection = self.testconn
+ queue = Queue(connection=connection)
+ retry = Retry(max=2)
+ job = queue.enqueue(div_by_zero, retry=retry)
+
+ worker = Worker([queue])
+
+ # If job if configured to retry, it will be put back in the queue
+ # This is the original execution
+ queue.empty()
+ worker.handle_job_failure(job, queue)
+ job.refresh()
+ self.assertEqual(job.retries_left, 1)
+ self.assertEqual([job.id], queue.job_ids)
+
+ # First retry
+ queue.empty()
+ worker.handle_job_failure(job, queue)
+ job.refresh()
+ self.assertEqual(job.retries_left, 0)
+ self.assertEqual([job.id], queue.job_ids)
+
+ # Second retry
+ queue.empty()
+ worker.handle_job_failure(job, queue)
+ job.refresh()
+ self.assertEqual(job.retries_left, 0)
+ self.assertEqual([], queue.job_ids)
+
+ def test_retry_interval(self):
+ """Retries with intervals are scheduled"""
+ connection = self.testconn
+ queue = Queue(connection=connection)
+ retry = Retry(max=1, interval=5)
+ job = queue.enqueue(div_by_zero, retry=retry)
+
+ worker = Worker([queue])
+ registry = queue.scheduled_job_registry
+ # If job if configured to retry with interval, it will be scheduled,
+ # not directly put back in the queue
+ queue.empty()
+ worker.handle_job_failure(job, queue)
+ job.refresh()
+ self.assertEqual(job.get_status(), JobStatus.SCHEDULED)
+ self.assertEqual(job.retries_left, 0)
+ self.assertEqual(len(registry), 1)
+ self.assertEqual(queue.job_ids, [])
+ # Scheduled time is roughly 5 seconds from now
+ scheduled_time = registry.get_scheduled_time(job)
+ now = datetime.now(timezone.utc)
+ self.assertTrue(now + timedelta(seconds=4) < scheduled_time < now + timedelta(seconds=6))
def test_total_working_time(self):
"""worker.total_working_time is stored properly"""
@@ -1056,7 +1110,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w.fork_work_horse(job, queue)
p = Process(target=wait_and_kill_work_horse, args=(w._horse_pid, 0.5))
p.start()
- w.monitor_work_horse(job)
+ w.monitor_work_horse(job, queue)
job_status = job.get_status()
p.join(1)
self.assertEqual(job_status, JobStatus.FAILED)
@@ -1082,7 +1136,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
job.timeout = 5
w.job_monitoring_interval = 1
now = utcnow()
- w.monitor_work_horse(job)
+ w.monitor_work_horse(job, queue)
fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor
self.assertTrue((utcnow() - now).total_seconds() < total_time)