Success and failure callbacks (#1480)

* Support enqueueing with on_success_callback

* Got success callback to execute properly

* Added on_failure callback

* Document success and failure callbacks

* More Flake8 fixes

* Mention that options can also be passed in via environment variables

* Increase coverage on test_callbacks.py
main
Selwin Ong 4 years ago committed by GitHub
parent 591f11bcc3
commit 5590aab458
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -69,12 +69,14 @@ results are kept. Expired jobs will be automatically deleted. Defaults to 500 se
* `ttl` specifies the maximum queued time (in seconds) of the job before it's discarded. * `ttl` specifies the maximum queued time (in seconds) of the job before it's discarded.
This argument defaults to `None` (infinite TTL). This argument defaults to `None` (infinite TTL).
* `failure_ttl` specifies how long failed jobs are kept (defaults to 1 year) * `failure_ttl` specifies how long failed jobs are kept (defaults to 1 year)
* `depends_on` specifies another job (or job id) that must complete before this * `depends_on` specifies another job (or list of jobs) that must complete before this
job will be queued. job will be queued.
* `job_id` allows you to manually specify this job's `job_id` * `job_id` allows you to manually specify this job's `job_id`
* `at_front` will place the job at the *front* of the queue, instead of the * `at_front` will place the job at the *front* of the queue, instead of the
back back
* `description` to add additional description to enqueued jobs. * `description` to add additional description to enqueued jobs.
* `on_success` allows you to run a function after a job completes successfully
* `on_failure` allows you to run a function after a job fails
* `args` and `kwargs`: use these to explicitly pass arguments and keyword to the * `args` and `kwargs`: use these to explicitly pass arguments and keyword to the
underlying job function. This is useful if your function happens to have underlying job function. This is useful if your function happens to have
conflicting argument names with RQ, for example `description` or `ttl`. conflicting argument names with RQ, for example `description` or `ttl`.
@ -132,6 +134,73 @@ with q.connection.pipeline() as pipe:
`Queue.prepare_data` accepts all arguments that `Queue.parse_args` does **EXCEPT** for `depends_on`, `Queue.prepare_data` accepts all arguments that `Queue.parse_args` does **EXCEPT** for `depends_on`,
which is not supported at this time, so dependencies will be up to you to setup. which is not supported at this time, so dependencies will be up to you to setup.
## Job dependencies
RQ allows you to chain the execution of multiple jobs.
To execute a job that depends on another job, use the `depends_on` argument:
```python
q = Queue('low', connection=my_redis_conn)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)
```
Specifying multiple dependencies are also supported:
```python
queue = Queue('low', connection=redis)
foo_job = queue.enqueue(foo)
bar_job = queue.enqueue(bar)
baz_job = queue.enqueue(baz, depends_on=[foo_job, bar_job])
```
The ability to handle job dependencies allows you to split a big job into
several smaller ones. A job that is dependent on another is enqueued only when
its dependency finishes *successfully*.
## Job Callbacks
_New in version 1.9.0._
If you want to execute a function whenever a job completes or fails, RQ provides
`on_success` and `on_failure` callbacks.
```python
queue.enqueue(say_hello, on_success=report_success, on_failure=report_failure)
```
### Success Callback
Success callbacks must be a function that accepts `job`, `connection` and `result` arguments.
Your function should also accept `*args` and `**kwargs` so your application doesn't break
when additional parameters are added.
```python
def report_success(job, connection, result, *args, **kwargs):
pass
```
Success callbacks are executed after job execution is complete, before dependents are enqueued.
If an exception happens when your callback is executed, job status will be set to `FAILED`
and dependents won't be enqueued.
Callbacks are limited to 60 seconds of execution time. If you want to execute a long running job,
consider using RQ's job dependency feature instead.
### Failure Callbacks
Failure callbacks are functions that accept `job`, `connection`, `type`, `value` and `traceback`
arguments. `type`, `value` and `traceback` values returned by [sys.exc_info()](https://docs.python.org/3/library/sys.html#sys.exc_info), which is the exception raised when executing your job.
```python
def report_failure(job, connection, type, value, traceback):
pass
```
Failure callbacks are limited to 60 seconds of execution time.
## Working with Queues ## Working with Queues
Besides enqueuing jobs, Queues have a few useful methods: Besides enqueuing jobs, Queues have a few useful methods:
@ -224,22 +293,6 @@ as `ALWAYS_EAGER`. Note, however, that you still need a working connection to
a redis instance for storing states related to job execution and completion. a redis instance for storing states related to job execution and completion.
## Job dependencies
New in RQ 0.4.0 is the ability to chain the execution of multiple jobs.
To execute a job that depends on another job, use the `depends_on` argument:
```python
q = Queue('low', connection=my_redis_conn)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)
```
The ability to handle job dependencies allows you to split a big job into
several smaller ones. A job that is dependent on another is enqueued only when
its dependency finishes *successfully*.
## The worker ## The worker
To learn about workers, see the [workers][w] documentation. To learn about workers, see the [workers][w] documentation.

@ -290,14 +290,13 @@ SENTRY_DSN = 'sync+http://public:secret@example.com/1'
The example above shows all the options that are currently supported. The example above shows all the options that are currently supported.
_Note: The_ `QUEUES` _and_ `REDIS_PASSWORD` _settings are new since 0.3.3._
To specify which module to read settings from, use the `-c` option: To specify which module to read settings from, use the `-c` option:
```console ```console
$ rq worker -c settings $ rq worker -c settings
``` ```
Alternatively, you can also pass in these options via environment variables.
## Custom Worker Classes ## Custom Worker Classes

@ -9,3 +9,4 @@ DEFAULT_RESULT_TTL = 500
DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds
DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S' DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'
CALLBACK_TIMEOUT = 60

@ -83,7 +83,7 @@ class Job:
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, ttl=None, status=None, description=None, result_ttl=None, ttl=None, status=None, description=None,
depends_on=None, timeout=None, id=None, origin=None, meta=None, depends_on=None, timeout=None, id=None, origin=None, meta=None,
failure_ttl=None, serializer=None): failure_ttl=None, serializer=None, *, on_success=None, on_failure=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -121,6 +121,16 @@ class Job:
job._args = args job._args = args
job._kwargs = kwargs job._kwargs = kwargs
if on_success:
if not inspect.isfunction(on_success) and not inspect.isbuiltin(on_success):
raise ValueError('on_success callback must be a function')
job._success_callback_name = '{0}.{1}'.format(on_success.__module__, on_success.__qualname__)
if on_failure:
if not inspect.isfunction(on_failure) and not inspect.isbuiltin(on_failure):
raise ValueError('on_failure callback must be a function')
job._failure_callback_name = '{0}.{1}'.format(on_failure.__module__, on_failure.__qualname__)
# Extra meta data # Extra meta data
job.description = description or job.get_call_string() job.description = description or job.get_call_string()
job.result_ttl = parse_timeout(result_ttl) job.result_ttl = parse_timeout(result_ttl)
@ -220,6 +230,26 @@ class Job:
return import_attribute(self.func_name) return import_attribute(self.func_name)
@property
def success_callback(self):
if self._success_callback is UNEVALUATED:
if self._success_callback_name:
self._success_callback = import_attribute(self._success_callback_name)
else:
self._success_callback = None
return self._success_callback
@property
def failure_callback(self):
if self._failure_callback is UNEVALUATED:
if self._failure_callback_name:
self._failure_callback = import_attribute(self._failure_callback_name)
else:
self._failure_callback = None
return self._failure_callback
def _deserialize_data(self): def _deserialize_data(self):
try: try:
self._func_name, self._instance, self._args, self._kwargs = self.serializer.loads(self.data) self._func_name, self._instance, self._args, self._kwargs = self.serializer.loads(self.data)
@ -346,6 +376,10 @@ class Job:
self._instance = UNEVALUATED self._instance = UNEVALUATED
self._args = UNEVALUATED self._args = UNEVALUATED
self._kwargs = UNEVALUATED self._kwargs = UNEVALUATED
self._success_callback_name = None
self._success_callback = UNEVALUATED
self._failure_callback_name = None
self._failure_callback = UNEVALUATED
self.description = None self.description = None
self.origin = None self.origin = None
self.enqueued_at = None self.enqueued_at = None
@ -400,8 +434,8 @@ class Job:
raise TypeError('id must be a string, not {0}'.format(type(value))) raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value self._id = value
def heartbeat(self, heartbeat, ttl, pipeline=None): def heartbeat(self, timestamp, ttl, pipeline=None):
self.last_heartbeat = heartbeat self.last_heartbeat = timestamp
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
self.started_job_registry.add(self, ttl, pipeline=pipeline) self.started_job_registry.add(self, ttl, pipeline=pipeline)
@ -508,10 +542,16 @@ class Job:
except Exception: except Exception:
self._result = "Unserializable return value" self._result = "Unserializable return value"
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None
self._status = obj.get('status').decode() if obj.get('status') else None self._status = obj.get('status').decode() if obj.get('status') else None
if obj.get('success_callback_name'):
self._success_callback_name = obj.get('success_callback_name').decode()
if obj.get('failure_callback_name'):
self._failure_callback_name = obj.get('failure_callback_name').decode()
dep_ids = obj.get('dependency_ids') dep_ids = obj.get('dependency_ids')
dep_id = obj.get('dependency_id') # for backwards compatibility dep_id = obj.get('dependency_id') # for backwards compatibility
self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids
@ -554,6 +594,8 @@ class Job:
obj = { obj = {
'created_at': utcformat(self.created_at or utcnow()), 'created_at': utcformat(self.created_at or utcnow()),
'data': zlib.compress(self.data), 'data': zlib.compress(self.data),
'success_callback_name': self._success_callback_name if self._success_callback_name else '',
'failure_callback_name': self._failure_callback_name if self._failure_callback_name else '',
'started_at': utcformat(self.started_at) if self.started_at else '', 'started_at': utcformat(self.started_at) if self.started_at else '',
'ended_at': utcformat(self.ended_at) if self.ended_at else '', 'ended_at': utcformat(self.ended_at) if self.ended_at else '',
'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '', 'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '',

@ -292,7 +292,8 @@ class Queue:
def create_job(self, func, args=None, kwargs=None, timeout=None, def create_job(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None, result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None, description=None, depends_on=None, job_id=None,
meta=None, status=JobStatus.QUEUED, retry=None): meta=None, status=JobStatus.QUEUED, retry=None, *,
on_success=None, on_failure=None):
"""Creates a job based on parameters given.""" """Creates a job based on parameters given."""
timeout = parse_timeout(timeout) timeout = parse_timeout(timeout)
@ -313,7 +314,8 @@ class Queue:
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
status=status, description=description, status=status, description=description,
depends_on=depends_on, timeout=timeout, id=job_id, depends_on=depends_on, timeout=timeout, id=job_id,
origin=self.name, meta=meta, serializer=self.serializer origin=self.name, meta=meta, serializer=self.serializer, on_success=on_success,
on_failure=on_failure
) )
if retry: if retry:
@ -371,9 +373,9 @@ class Queue:
return job return job
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None, result_ttl=None, ttl=None, failure_ttl=None, description=None,
description=None, depends_on=None, job_id=None, depends_on=None, job_id=None, at_front=False, meta=None,
at_front=False, meta=None, retry=None, pipeline=None): retry=None, on_success=None, on_failure=None, pipeline=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
nd nd
@ -386,7 +388,7 @@ nd
func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description, depends_on=depends_on, failure_ttl=failure_ttl, description=description, depends_on=depends_on,
job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
retry=retry retry=retry, on_success=on_success, on_failure=on_failure
) )
job = self.setup_dependencies( job = self.setup_dependencies(
@ -477,6 +479,8 @@ nd
at_front = kwargs.pop('at_front', False) at_front = kwargs.pop('at_front', False)
meta = kwargs.pop('meta', None) meta = kwargs.pop('meta', None)
retry = kwargs.pop('retry', None) retry = kwargs.pop('retry', None)
on_success = kwargs.pop('on_success', None)
on_failure = kwargs.pop('on_failure', None)
pipeline = kwargs.pop('pipeline', None) pipeline = kwargs.pop('pipeline', None)
if 'args' in kwargs or 'kwargs' in kwargs: if 'args' in kwargs or 'kwargs' in kwargs:
@ -485,30 +489,35 @@ nd
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return (f, timeout, description, result_ttl, ttl, failure_ttl, return (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) depends_on, job_id, at_front, meta, retry, on_success, on_failure,
pipeline, args, kwargs)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues it.""" """Creates a job to represent the delayed function call and enqueues it."""
(f, timeout, description, result_ttl, ttl, failure_ttl, (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) depends_on, job_id, at_front, meta, retry, on_success,
on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
return self.enqueue_call( return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout, func=f, args=args, kwargs=kwargs, timeout=timeout,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
description=description, depends_on=depends_on, job_id=job_id, description=description, depends_on=depends_on, job_id=job_id,
at_front=at_front, meta=meta, retry=retry, pipeline=pipeline at_front=at_front, meta=meta, retry=retry, on_success=on_success, on_failure=on_failure,
pipeline=pipeline
) )
def enqueue_at(self, datetime, f, *args, **kwargs): def enqueue_at(self, datetime, f, *args, **kwargs):
"""Schedules a job to be enqueued at specified time""" """Schedules a job to be enqueued at specified time"""
(f, timeout, description, result_ttl, ttl, failure_ttl, (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) depends_on, job_id, at_front, meta, retry, on_success, on_failure,
pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl, timeout=timeout, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description, failure_ttl=failure_ttl, description=description,
depends_on=depends_on, job_id=job_id, meta=meta, retry=retry) depends_on=depends_on, job_id=job_id, meta=meta, retry=retry,
on_success=on_success, on_failure=on_failure)
return self.schedule_job(job, datetime, pipeline=pipeline) return self.schedule_job(job, datetime, pipeline=pipeline)

@ -31,7 +31,7 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
from .compat import as_text, string_types, text_type from .compat import as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (DEFAULT_RESULT_TTL, from .defaults import (CALLBACK_TIMEOUT, DEFAULT_RESULT_TTL,
DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT)
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
@ -435,7 +435,7 @@ class Worker:
stat = None stat = None
try: try:
pid, stat = os.waitpid(self.horse_pid, 0) pid, stat = os.waitpid(self.horse_pid, 0)
except ChildProcessError as e: except ChildProcessError:
# ChildProcessError: [Errno 10] No child processes # ChildProcessError: [Errno 10] No child processes
pass pass
return pid, stat return pid, stat
@ -873,7 +873,7 @@ class Worker:
self.log = logger self.log = logger
try: try:
self.perform_job(job, queue) self.perform_job(job, queue)
except: except: # noqa
os._exit(1) os._exit(1)
# os._exit() is the way to exit from childs after a fork(), in # os._exit() is the way to exit from childs after a fork(), in
@ -1002,6 +1002,18 @@ class Worker:
except redis.exceptions.WatchError: except redis.exceptions.WatchError:
continue continue
def execute_success_callback(self, job, result):
"""Executes success_callback with timeout"""
job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.success_callback(job, self.connection, result)
def execute_failure_callback(self, job):
"""Executes failure_callback with timeout"""
job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.failure_callback(job, self.connection, *sys.exc_info())
def perform_job(self, job, queue): def perform_job(self, job, queue):
"""Performs the actual work of a job. Will/should only be called """Performs the actual work of a job. Will/should only be called
inside the work horse's process. inside the work horse's process.
@ -1023,6 +1035,10 @@ class Worker:
# Pickle the result in the same try-except block since we need # Pickle the result in the same try-except block since we need
# to use the same exc handling when pickling fails # to use the same exc handling when pickling fails
job._result = rv job._result = rv
if job.success_callback:
self.execute_success_callback(job, rv)
self.handle_job_success(job=job, self.handle_job_success(job=job,
queue=queue, queue=queue,
started_job_registry=started_job_registry) started_job_registry=started_job_registry)
@ -1030,6 +1046,18 @@ class Worker:
job.ended_at = utcnow() job.ended_at = utcnow()
exc_info = sys.exc_info() exc_info = sys.exc_info()
exc_string = ''.join(traceback.format_exception(*exc_info)) exc_string = ''.join(traceback.format_exception(*exc_info))
if job.failure_callback:
try:
self.execute_failure_callback(job)
except: # noqa
self.log.error(
'Worker %s: error while executing failure callback',
self.key, exc_info=True
)
exc_info = sys.exc_info()
exc_string = ''.join(traceback.format_exception(*exc_info))
self.handle_job_failure(job=job, exc_string=exc_string, queue=queue, self.handle_job_failure(job=job, exc_string=exc_string, queue=queue,
started_job_registry=started_job_registry) started_job_registry=started_job_registry)
self.handle_exception(job, *exc_info) self.handle_exception(job, *exc_info)
@ -1140,7 +1168,8 @@ class SimpleWorker(Worker):
return self.perform_job(job, queue) return self.perform_job(job, queue)
def get_heartbeat_ttl(self, job): def get_heartbeat_ttl(self, job):
# "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. We should just stick to DEFAULT_WORKER_TTL. # "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59.
# # We should just stick to DEFAULT_WORKER_TTL.
if job.timeout == -1: if job.timeout == -1:
return DEFAULT_WORKER_TTL return DEFAULT_WORKER_TTL
else: else:

@ -7,12 +7,8 @@ import os
from redis import Redis from redis import Redis
from rq import pop_connection, push_connection from rq import pop_connection, push_connection
from rq.job import cancel_job
try: import unittest
import unittest
except ImportError:
import unittest2 as unittest # noqa
def find_empty_redis_database(ssl=False): def find_empty_redis_database(ssl=False):
@ -20,7 +16,7 @@ def find_empty_redis_database(ssl=False):
will use/connect it when no keys are in there. will use/connect it when no keys are in there.
""" """
for dbnum in range(4, 17): for dbnum in range(4, 17):
connection_kwargs = { 'db': dbnum } connection_kwargs = {'db': dbnum}
if ssl: if ssl:
connection_kwargs['port'] = 9736 connection_kwargs['port'] = 9736
connection_kwargs['ssl'] = True connection_kwargs['ssl'] = True
@ -35,9 +31,11 @@ def find_empty_redis_database(ssl=False):
def slow(f): def slow(f):
return unittest.skipUnless(os.environ.get('RUN_SLOW_TESTS_TOO'), "Slow tests disabled")(f) return unittest.skipUnless(os.environ.get('RUN_SLOW_TESTS_TOO'), "Slow tests disabled")(f)
def ssl_test(f): def ssl_test(f):
return unittest.skipUnless(os.environ.get('RUN_SSL_TESTS'), "SSL tests disabled")(f) return unittest.skipUnless(os.environ.get('RUN_SSL_TESTS'), "SSL tests disabled")(f)
class RQTestCase(unittest.TestCase): class RQTestCase(unittest.TestCase):
"""Base class to inherit test cases from for RQ. """Base class to inherit test cases from for RQ.

@ -46,12 +46,15 @@ def do_nothing():
"""The best job in the world.""" """The best job in the world."""
pass pass
def raise_exc(): def raise_exc():
raise Exception('raise_exc error') raise Exception('raise_exc error')
def raise_exc_mock(): def raise_exc_mock():
return raise_exc return raise_exc
def div_by_zero(x): def div_by_zero(x):
"""Prepare for a division-by-zero exception.""" """Prepare for a division-by-zero exception."""
return x / 0 return x / 0
@ -63,6 +66,7 @@ def some_calculation(x, y, z=1):
""" """
return x * y / z return x * y / z
def rpush(key, value, append_worker_name=False, sleep=0): def rpush(key, value, append_worker_name=False, sleep=0):
"""Push a value into a list in Redis. Useful for detecting the order in """Push a value into a list in Redis. Useful for detecting the order in
which jobs were executed.""" which jobs were executed."""
@ -73,9 +77,11 @@ def rpush(key, value, append_worker_name=False, sleep=0):
redis = get_current_connection() redis = get_current_connection()
redis.rpush(key, value) redis.rpush(key, value)
def check_dependencies_are_met(): def check_dependencies_are_met():
return get_current_job().dependencies_are_met() return get_current_job().dependencies_are_met()
def create_file(path): def create_file(path):
"""Creates a file at the given path. Actually, leaves evidence that the """Creates a file at the given path. Actually, leaves evidence that the
job ran.""" job ran."""
@ -87,18 +93,19 @@ def create_file_after_timeout(path, timeout):
time.sleep(timeout) time.sleep(timeout)
create_file(path) create_file(path)
def create_file_after_timeout_and_setsid(path, timeout): def create_file_after_timeout_and_setsid(path, timeout):
os.setsid() os.setsid()
create_file_after_timeout(path, timeout) create_file_after_timeout(path, timeout)
def launch_process_within_worker_and_store_pid(path, timeout):
def launch_process_within_worker_and_store_pid(path, timeout):
p = subprocess.Popen(['sleep', str(timeout)]) p = subprocess.Popen(['sleep', str(timeout)])
with open(path, 'w') as f: with open(path, 'w') as f:
f.write('{}'.format(p.pid)) f.write('{}'.format(p.pid))
p.wait() p.wait()
def access_self(): def access_self():
assert get_current_connection() is not None assert get_current_connection() is not None
assert get_current_job() is not None assert get_current_job() is not None
@ -264,3 +271,18 @@ def burst_two_workers(queue, timeout=2, tries=5, pause=0.1):
# Now can start the second worker. # Now can start the second worker.
w2.work(burst=True) w2.work(burst=True)
w1.join(timeout) w1.join(timeout)
def save_result(job, connection, result):
"""Store job result in a key"""
connection.set('success_callback:%s' % job.id, result, ex=60)
def save_exception(job, connection, type, value, traceback):
"""Store job exception in a key"""
connection.set('failure_callback:%s' % job.id, str(value), ex=60)
def erroneous_callback(job):
"""A callback that's not written properly"""
pass

@ -0,0 +1,142 @@
from datetime import timedelta
from tests import RQTestCase
from tests.fixtures import div_by_zero, erroneous_callback, save_exception, save_result, say_hello
from rq import Queue, Worker
from rq.job import Job, JobStatus, UNEVALUATED
from rq.worker import SimpleWorker
class QueueCallbackTestCase(RQTestCase):
def test_enqueue_with_success_callback(self):
"""Test enqueue* methods with on_success"""
queue = Queue(connection=self.testconn)
# Only functions and builtins are supported as callback
with self.assertRaises(ValueError):
queue.enqueue(say_hello, on_success=Job.fetch)
job = queue.enqueue(say_hello, on_success=print)
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.success_callback, print)
job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_success=print)
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.success_callback, print)
def test_enqueue_with_failure_callback(self):
"""queue.enqueue* methods with on_failure is persisted correctly"""
queue = Queue(connection=self.testconn)
# Only functions and builtins are supported as callback
with self.assertRaises(ValueError):
queue.enqueue(say_hello, on_failure=Job.fetch)
job = queue.enqueue(say_hello, on_failure=print)
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.failure_callback, print)
job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_failure=print)
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.failure_callback, print)
class WorkerCallbackTestCase(RQTestCase):
def test_success_callback(self):
"""Test success callback is executed only when job is successful"""
queue = Queue(connection=self.testconn)
worker = SimpleWorker([queue])
job = queue.enqueue(say_hello, on_success=save_result)
# Callback is executed when job is successfully executed
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(
self.testconn.get('success_callback:%s' % job.id).decode(),
job.result
)
job = queue.enqueue(div_by_zero, on_success=save_result)
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('success_callback:%s' % job.id))
def test_erroneous_success_callback(self):
"""Test exception handling when executing success callback"""
queue = Queue(connection=self.testconn)
worker = Worker([queue])
# If success_callback raises an error, job will is considered as failed
job = queue.enqueue(say_hello, on_success=erroneous_callback)
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
def test_failure_callback(self):
"""Test failure callback is executed only when job a fails"""
queue = Queue(connection=self.testconn)
worker = SimpleWorker([queue])
job = queue.enqueue(div_by_zero, on_failure=save_exception)
# Callback is executed when job is successfully executed
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
job.refresh()
print(job.exc_info)
self.assertIn('div_by_zero',
self.testconn.get('failure_callback:%s' % job.id).decode())
job = queue.enqueue(div_by_zero, on_success=save_result)
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id))
# TODO: add test case for error while executing failure callback
class JobCallbackTestCase(RQTestCase):
def test_job_creation_with_success_callback(self):
"""Ensure callbacks are created and persisted properly"""
job = Job.create(say_hello)
self.assertIsNone(job._success_callback_name)
# _success_callback starts with UNEVALUATED
self.assertEqual(job._success_callback, UNEVALUATED)
self.assertEqual(job.success_callback, None)
# _success_callback becomes `None` after `job.success_callback` is called if there's no success callback
self.assertEqual(job._success_callback, None)
# job.success_callback is assigned properly
job = Job.create(say_hello, on_success=print)
self.assertIsNotNone(job._success_callback_name)
self.assertEqual(job.success_callback, print)
job.save()
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.success_callback, print)
def test_job_creation_with_failure_callback(self):
"""Ensure failure callbacks are persisted properly"""
job = Job.create(say_hello)
self.assertIsNone(job._failure_callback_name)
# _failure_callback starts with UNEVALUATED
self.assertEqual(job._failure_callback, UNEVALUATED)
self.assertEqual(job.failure_callback, None)
# _failure_callback becomes `None` after `job.failure_callback` is called if there's no failure callback
self.assertEqual(job._failure_callback, None)
# job.failure_callback is assigned properly
job = Job.create(say_hello, on_failure=print)
self.assertIsNotNone(job._failure_callback_name)
self.assertEqual(job.failure_callback, print)
job.save()
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.failure_callback, print)

@ -210,8 +210,10 @@ class TestJob(RQTestCase):
# ... and no other keys are stored # ... and no other keys are stored
self.assertEqual( self.assertEqual(
sorted(self.testconn.hkeys(job.key)), set(self.testconn.hkeys(job.key)),
[b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at', b'worker_name']) {b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at',
b'worker_name', b'success_callback_name', b'failure_callback_name'}
)
self.assertEqual(job.last_heartbeat, None) self.assertEqual(job.last_heartbeat, None)
self.assertEqual(job.last_heartbeat, None) self.assertEqual(job.last_heartbeat, None)

Loading…
Cancel
Save