callback func as string (#1905)

* callback func as string

* add tests for string callbacks; update documentation and type annotations

* lint

* concise test for string callback

* add string callbacks to existing tests

* remove string callback testcase; extend existing testcases
main
Rishabh Ranjan 2 years ago committed by GitHub
parent f158fe7ba1
commit a53966c918
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -77,6 +77,7 @@ results are kept. Expired jobs will be automatically deleted. Defaults to 500 se
* `description` to add additional description to enqueued jobs.
* `on_success` allows you to run a function after a job completes successfully
* `on_failure` allows you to run a function after a job fails
* `on_stopped` allows you to run a function after a job is stopped
* `args` and `kwargs`: use these to explicitly pass arguments and keyword to the
underlying job function. This is useful if your function happens to have
conflicting argument names with RQ, for example `description` or `ttl`.
@ -217,6 +218,8 @@ queue.enqueue(say_hello,
on_stopped=Callback(report_stopped, timeout="2m")) # 2 minute timeout
```
You can also pass the function as a string reference: `Callback('my_package.my_module.my_func')`
### Success Callback
Success callbacks must be a function that accepts `job`, `connection` and `result` arguments.

@ -7,6 +7,7 @@ if TYPE_CHECKING:
from .job import Retry
from .defaults import DEFAULT_RESULT_TTL
from .job import Callback
from .queue import Queue
from .utils import backend_class
@ -28,9 +29,9 @@ class job: # noqa
description: Optional[str] = None,
failure_ttl: Optional[int] = None,
retry: Optional['Retry'] = None,
on_failure: Optional[Callable[..., Any]] = None,
on_success: Optional[Callable[..., Any]] = None,
on_stopped: Optional[Callable[..., Any]] = None,
on_failure: Optional[Union[Callback, Callable[..., Any]]] = None,
on_success: Optional[Union[Callback, Callable[..., Any]]] = None,
on_stopped: Optional[Union[Callback, Callable[..., Any]]] = None,
):
"""A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a required
@ -59,9 +60,12 @@ class job: # noqa
description (Optional[str], optional): Job description. Defaults to None.
failure_ttl (Optional[int], optional): Failture time to live. Defaults to None.
retry (Optional[Retry], optional): A Retry object. Defaults to None.
on_failure (Optional[Callable[..., Any]], optional): Callable to run on failure. Defaults to None.
on_success (Optional[Callable[..., Any]], optional): Callable to run on success. Defaults to None.
on_stopped (Optional[Callable[..., Any]], optional): Callable to run when stopped. Defaults to None.
on_failure (Optional[Union[Callback, Callable[..., Any]]], optional): Callable to run on failure. Defaults
to None.
on_success (Optional[Union[Callback, Callable[..., Any]]], optional): Callable to run on success. Defaults
to None.
on_stopped (Optional[Union[Callback, Callable[..., Any]]], optional): Callable to run when stopped. Defaults
to None.
"""
self.queue = queue
self.queue_class = backend_class(self, 'queue_class', override=queue_class)

@ -158,9 +158,9 @@ class Job:
failure_ttl: Optional[int] = None,
serializer=None,
*,
on_success: Optional[Union['Callback', Callable[..., Any]]] = None,
on_failure: Optional[Union['Callback', Callable[..., Any]]] = None,
on_stopped: Optional[Union['Callback', Callable[..., Any]]] = None,
on_success: Optional[Union['Callback', Callable[..., Any]]] = None, # Callable is deprecated
on_failure: Optional[Union['Callback', Callable[..., Any]]] = None, # Callable is deprecated
on_stopped: Optional[Union['Callback', Callable[..., Any]]] = None, # Callable is deprecated
) -> 'Job':
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
@ -193,20 +193,20 @@ class Job:
Defaults to None.
serializer (Optional[str], optional): The serializer class path to use. Should be a string with the import
path for the serializer to use. eg. `mymodule.myfile.MySerializer` Defaults to None.
on_success (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run
when/if the Job finishes sucessfully. Defaults to None.
on_failure (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run
when/if the Job fails. Defaults to None.
on_stopped (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run
when/if the Job is stopped. Defaults to None.
on_success (Optional[Union['Callback', Callable[..., Any]]], optional): A callback to run when/if the Job
finishes sucessfully. Defaults to None. Passing a callable is deprecated.
on_failure (Optional[Union['Callback', Callable[..., Any]]], optional): A callback to run when/if the Job
fails. Defaults to None. Passing a callable is deprecated.
on_stopped (Optional[Union['Callback', Callable[..., Any]]], optional): A callback to run when/if the Job
is stopped. Defaults to None. Passing a callable is deprecated.
Raises:
TypeError: If `args` is not a tuple/list
TypeError: If `kwargs` is not a dict
TypeError: If the `func` is something other than a string or a Callable reference
ValueError: If `on_failure` is not a function
ValueError: If `on_success` is not a function
ValueError: If `on_stopped` is not a function
ValueError: If `on_failure` is not a Callback or function or string
ValueError: If `on_success` is not a Callback or function or string
ValueError: If `on_stopped` is not a Callback or function or string
Returns:
Job: A job instance.
@ -248,7 +248,8 @@ class Job:
if on_success:
if not isinstance(on_success, Callback):
warnings.warn(
'Passing a `Callable` `on_success` is deprecated, pass `Callback` instead', DeprecationWarning
'Passing a string or function for `on_success` is deprecated, pass `Callback` instead',
DeprecationWarning,
)
on_success = Callback(on_success) # backward compatibility
job._success_callback_name = on_success.name
@ -257,7 +258,8 @@ class Job:
if on_failure:
if not isinstance(on_failure, Callback):
warnings.warn(
'Passing a `Callable` `on_failure` is deprecated, pass `Callback` instead', DeprecationWarning
'Passing a string or function for `on_failure` is deprecated, pass `Callback` instead',
DeprecationWarning,
)
on_failure = Callback(on_failure) # backward compatibility
job._failure_callback_name = on_failure.name
@ -266,7 +268,8 @@ class Job:
if on_stopped:
if not isinstance(on_stopped, Callback):
warnings.warn(
'Passing a `Callable` `on_stopped` is deprecated, pass `Callback` instead', DeprecationWarning
'Passing a string or function for `on_stopped` is deprecated, pass `Callback` instead',
DeprecationWarning,
)
on_stopped = Callback(on_stopped) # backward compatibility
job._stopped_callback_name = on_stopped.name
@ -1640,13 +1643,15 @@ class Retry:
class Callback:
def __init__(self, func: Callable[..., Any], timeout: Optional[Any] = None):
if not inspect.isfunction(func) and not inspect.isbuiltin(func):
raise ValueError('Callback func must be a function')
def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[Any] = None):
if not isinstance(func, str) and not inspect.isfunction(func) and not inspect.isbuiltin(func):
raise ValueError('Callback `func` must be a string or function')
self.func = func
self.timeout = parse_timeout(timeout) if timeout else CALLBACK_TIMEOUT
@property
def name(self) -> str:
if isinstance(self.func, str):
return self.func
return '{0}.{1}'.format(self.func.__module__, self.func.__qualname__)

@ -22,7 +22,7 @@ from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .dependency import Dependency
from .exceptions import DequeueTimeout, NoSuchJobError
from .job import Job, JobStatus
from .job import Callback, Job, JobStatus
from .logutils import blue, green
from .serializers import resolve_serializer
from .types import FunctionReferenceType, JobDependencyType
@ -515,9 +515,9 @@ class Queue:
status: JobStatus = JobStatus.QUEUED,
retry: Optional['Retry'] = None,
*,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
on_stopped: Optional[Callable] = None,
on_success: Optional[Union[Callback, Callable]] = None,
on_failure: Optional[Union[Callback, Callable]] = None,
on_stopped: Optional[Union[Callback, Callable]] = None,
) -> Job:
"""Creates a job based on parameters given
@ -535,9 +535,13 @@ class Queue:
meta (Optional[Dict], optional): Job metadata. Defaults to None.
status (JobStatus, optional): Job status. Defaults to JobStatus.QUEUED.
retry (Optional[Retry], optional): The Retry Object. Defaults to None.
on_success (Optional[Callable], optional): On success callable. Defaults to None.
on_failure (Optional[Callable], optional): On failure callable. Defaults to None.
on_stopped (Optional[Callable], optional): On stopped callable. Defaults to None.
on_success (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on success. Defaults to
None. Callable is deprecated.
on_failure (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on failure. Defaults to
None. Callable is deprecated.
on_stopped (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on stopped. Defaults to
None. Callable is deprecated.
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
Raises:
ValueError: If the timeout is 0
@ -659,9 +663,9 @@ class Queue:
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable[..., Any]] = None,
on_failure: Optional[Callable[..., Any]] = None,
on_stopped: Optional[Callable[..., Any]] = None,
on_success: Optional[Union[Callback, Callable[..., Any]]] = None,
on_failure: Optional[Union[Callback, Callable[..., Any]]] = None,
on_stopped: Optional[Union[Callback, Callable[..., Any]]] = None,
pipeline: Optional['Pipeline'] = None,
) -> Job:
"""Creates a job to represent the delayed function call and enqueues it.
@ -684,9 +688,12 @@ class Queue:
at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False.
meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None.
retry (Optional[Retry], optional): Retry object. Defaults to None.
on_success (Optional[Callable[..., Any]], optional): Callable for on success. Defaults to None.
on_failure (Optional[Callable[..., Any]], optional): Callable for on failure. Defaults to None.
on_stopped (Optional[Callable[..., Any]], optional): Callable for on stopped. Defaults to None.
on_success (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on success. Defaults to
None. Callable is deprecated.
on_failure (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on failure. Defaults to
None. Callable is deprecated.
on_stopped (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on stopped. Defaults to
None. Callable is deprecated.
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
Returns:
@ -728,9 +735,9 @@ class Queue:
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
on_stopped: Optional[Callable] = None,
on_success: Optional[Union[Callback, Callable]] = None,
on_failure: Optional[Union[Callback, Callable]] = None,
on_stopped: Optional[Union[Callback, Callable]] = None,
) -> EnqueueData:
"""Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
And can keep this logic within EnqueueData
@ -749,9 +756,12 @@ class Queue:
at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False.
meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None.
retry (Optional[Retry], optional): Retry object. Defaults to None.
on_success (Optional[Callable[..., Any]], optional): Callable for on success. Defaults to None.
on_failure (Optional[Callable[..., Any]], optional): Callable for on failure. Defaults to None.
on_stopped (Optional[Callable[..., Any]], optional): Callable for on stopped. Defaults to None.
on_success (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on success. Defaults to
None. Callable is deprecated.
on_failure (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on failure. Defaults to
None. Callable is deprecated.
on_stopped (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on stopped. Defaults to
None. Callable is deprecated.
Returns:
EnqueueData: The EnqueueData

@ -102,6 +102,10 @@ def import_attribute(name: str) -> Callable[..., Any]:
attribute_bits.insert(0, module_name_bits.pop())
if module is None:
# maybe it's a builtin
try:
return __builtins__[name]
except KeyError:
raise ValueError('Invalid attribute name: %s' % name)
attribute_name = '.'.join(attribute_bits)

@ -1,7 +1,7 @@
from datetime import timedelta
from rq import Queue, Worker
from rq.job import UNEVALUATED, Job, JobStatus
from rq.job import UNEVALUATED, Callback, Job, JobStatus
from rq.serializers import JSONSerializer
from rq.worker import SimpleWorker
from tests import RQTestCase
@ -35,6 +35,17 @@ class QueueCallbackTestCase(RQTestCase):
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.success_callback, print)
# test string callbacks
job = queue.enqueue(say_hello, on_success=Callback("print"))
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.success_callback, print)
job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_success=Callback("print"))
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.success_callback, print)
def test_enqueue_with_failure_callback(self):
"""queue.enqueue* methods with on_failure is persisted correctly"""
queue = Queue(connection=self.testconn)
@ -53,6 +64,17 @@ class QueueCallbackTestCase(RQTestCase):
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.failure_callback, print)
# test string callbacks
job = queue.enqueue(say_hello, on_failure=Callback("print"))
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.failure_callback, print)
job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_failure=Callback("print"))
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.failure_callback, print)
def test_enqueue_with_stopped_callback(self):
"""queue.enqueue* methods with on_stopped is persisted correctly"""
queue = Queue(connection=self.testconn)
@ -71,6 +93,17 @@ class QueueCallbackTestCase(RQTestCase):
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.stopped_callback, print)
# test string callbacks
job = queue.enqueue(long_process, on_stopped=Callback("print"))
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.stopped_callback, print)
job = queue.enqueue_in(timedelta(seconds=10), long_process, on_stopped=Callback("print"))
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.stopped_callback, print)
class SyncJobCallback(RQTestCase):
def test_success_callback(self):
@ -85,6 +118,15 @@ class SyncJobCallback(RQTestCase):
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('success_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.save_result"))
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.result)
job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result"))
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('success_callback:%s' % job.id))
def test_failure_callback(self):
"""queue.enqueue* methods with on_failure is persisted correctly"""
queue = Queue(is_async=False)
@ -97,17 +139,34 @@ class SyncJobCallback(RQTestCase):
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(div_by_zero, on_failure=Callback("tests.fixtures.save_exception"))
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode())
job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result"))
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id))
def test_stopped_callback(self):
"""queue.enqueue* methods with on_stopped is persisted correctly"""
connection = self.testconn
queue = Queue('foo', connection=connection, serializer=JSONSerializer)
worker = SimpleWorker('foo', connection=connection, serializer=JSONSerializer)
job = queue.enqueue(long_process, on_stopped=save_result_if_not_stopped)
job.execute_stopped_callback(
worker.death_penalty_class
) # Calling execute_stopped_callback directly for coverage
self.assertTrue(self.testconn.exists('stopped_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(long_process, on_stopped=Callback("tests.fixtures.save_result_if_not_stopped"))
job.execute_stopped_callback(
worker.death_penalty_class
) # Calling execute_stopped_callback directly for coverage
self.assertTrue(self.testconn.exists('stopped_callback:%s' % job.id))
class WorkerCallbackTestCase(RQTestCase):
def test_success_callback(self):
@ -115,9 +174,8 @@ class WorkerCallbackTestCase(RQTestCase):
queue = Queue(connection=self.testconn)
worker = SimpleWorker([queue])
job = queue.enqueue(say_hello, on_success=save_result)
# Callback is executed when job is successfully executed
job = queue.enqueue(say_hello, on_success=save_result)
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.return_value())
@ -127,6 +185,17 @@ class WorkerCallbackTestCase(RQTestCase):
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('success_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.save_result"))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.return_value())
job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result"))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('success_callback:%s' % job.id))
def test_erroneous_success_callback(self):
"""Test exception handling when executing success callback"""
queue = Queue(connection=self.testconn)
@ -137,14 +206,18 @@ class WorkerCallbackTestCase(RQTestCase):
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
# test string callbacks
job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.erroneous_callback"))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
def test_failure_callback(self):
"""Test failure callback is executed only when job a fails"""
queue = Queue(connection=self.testconn)
worker = SimpleWorker([queue])
job = queue.enqueue(div_by_zero, on_failure=save_exception)
# Callback is executed when job is successfully executed
job = queue.enqueue(div_by_zero, on_failure=save_exception)
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
job.refresh()
@ -156,6 +229,19 @@ class WorkerCallbackTestCase(RQTestCase):
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id))
# test string callbacks
job = queue.enqueue(div_by_zero, on_failure=Callback("tests.fixtures.save_exception"))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
job.refresh()
print(job.exc_info)
self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode())
job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result"))
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id))
# TODO: add test case for error while executing failure callback
@ -179,6 +265,15 @@ class JobCallbackTestCase(RQTestCase):
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.success_callback, print)
# test string callbacks
job = Job.create(say_hello, on_success=Callback("print"))
self.assertIsNotNone(job._success_callback_name)
self.assertEqual(job.success_callback, print)
job.save()
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.success_callback, print)
def test_job_creation_with_failure_callback(self):
"""Ensure failure callbacks are persisted properly"""
job = Job.create(say_hello)
@ -198,6 +293,15 @@ class JobCallbackTestCase(RQTestCase):
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.failure_callback, print)
# test string callbacks
job = Job.create(say_hello, on_failure=Callback("print"))
self.assertIsNotNone(job._failure_callback_name)
self.assertEqual(job.failure_callback, print)
job.save()
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.failure_callback, print)
def test_job_creation_with_stopped_callback(self):
"""Ensure stopped callbacks are persisted properly"""
job = Job.create(say_hello)
@ -216,3 +320,12 @@ class JobCallbackTestCase(RQTestCase):
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.stopped_callback, print)
# test string callbacks
job = Job.create(say_hello, on_stopped=Callback("print"))
self.assertIsNotNone(job._stopped_callback_name)
self.assertEqual(job.stopped_callback, print)
job.save()
job = Job.fetch(id=job.id, connection=self.testconn)
self.assertEqual(job.stopped_callback, print)

Loading…
Cancel
Save