diff --git a/docs/docs/index.md b/docs/docs/index.md index 9b6a922..aa3b2c9 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -77,6 +77,7 @@ results are kept. Expired jobs will be automatically deleted. Defaults to 500 se * `description` to add additional description to enqueued jobs. * `on_success` allows you to run a function after a job completes successfully * `on_failure` allows you to run a function after a job fails +* `on_stopped` allows you to run a function after a job is stopped * `args` and `kwargs`: use these to explicitly pass arguments and keyword to the underlying job function. This is useful if your function happens to have conflicting argument names with RQ, for example `description` or `ttl`. @@ -217,6 +218,8 @@ queue.enqueue(say_hello, on_stopped=Callback(report_stopped, timeout="2m")) # 2 minute timeout ``` +You can also pass the function as a string reference: `Callback('my_package.my_module.my_func')` + ### Success Callback Success callbacks must be a function that accepts `job`, `connection` and `result` arguments. diff --git a/rq/decorators.py b/rq/decorators.py index e15e3ff..a5d602d 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -7,6 +7,7 @@ if TYPE_CHECKING: from .job import Retry from .defaults import DEFAULT_RESULT_TTL +from .job import Callback from .queue import Queue from .utils import backend_class @@ -28,9 +29,9 @@ class job: # noqa description: Optional[str] = None, failure_ttl: Optional[int] = None, retry: Optional['Retry'] = None, - on_failure: Optional[Callable[..., Any]] = None, - on_success: Optional[Callable[..., Any]] = None, - on_stopped: Optional[Callable[..., Any]] = None, + on_failure: Optional[Union[Callback, Callable[..., Any]]] = None, + on_success: Optional[Union[Callback, Callable[..., Any]]] = None, + on_stopped: Optional[Union[Callback, Callable[..., Any]]] = None, ): """A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when called. Accepts a required @@ -59,9 +60,12 @@ class job: # noqa description (Optional[str], optional): Job description. Defaults to None. failure_ttl (Optional[int], optional): Failture time to live. Defaults to None. retry (Optional[Retry], optional): A Retry object. Defaults to None. - on_failure (Optional[Callable[..., Any]], optional): Callable to run on failure. Defaults to None. - on_success (Optional[Callable[..., Any]], optional): Callable to run on success. Defaults to None. - on_stopped (Optional[Callable[..., Any]], optional): Callable to run when stopped. Defaults to None. + on_failure (Optional[Union[Callback, Callable[..., Any]]], optional): Callable to run on failure. Defaults + to None. + on_success (Optional[Union[Callback, Callable[..., Any]]], optional): Callable to run on success. Defaults + to None. + on_stopped (Optional[Union[Callback, Callable[..., Any]]], optional): Callable to run when stopped. Defaults + to None. """ self.queue = queue self.queue_class = backend_class(self, 'queue_class', override=queue_class) diff --git a/rq/job.py b/rq/job.py index 4fa712d..0acfa40 100644 --- a/rq/job.py +++ b/rq/job.py @@ -158,9 +158,9 @@ class Job: failure_ttl: Optional[int] = None, serializer=None, *, - on_success: Optional[Union['Callback', Callable[..., Any]]] = None, - on_failure: Optional[Union['Callback', Callable[..., Any]]] = None, - on_stopped: Optional[Union['Callback', Callable[..., Any]]] = None, + on_success: Optional[Union['Callback', Callable[..., Any]]] = None, # Callable is deprecated + on_failure: Optional[Union['Callback', Callable[..., Any]]] = None, # Callable is deprecated + on_stopped: Optional[Union['Callback', Callable[..., Any]]] = None, # Callable is deprecated ) -> 'Job': """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -193,20 +193,20 @@ class Job: Defaults to None. serializer (Optional[str], optional): The serializer class path to use. Should be a string with the import path for the serializer to use. eg. `mymodule.myfile.MySerializer` Defaults to None. - on_success (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run - when/if the Job finishes sucessfully. Defaults to None. - on_failure (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run - when/if the Job fails. Defaults to None. - on_stopped (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run - when/if the Job is stopped. Defaults to None. + on_success (Optional[Union['Callback', Callable[..., Any]]], optional): A callback to run when/if the Job + finishes sucessfully. Defaults to None. Passing a callable is deprecated. + on_failure (Optional[Union['Callback', Callable[..., Any]]], optional): A callback to run when/if the Job + fails. Defaults to None. Passing a callable is deprecated. + on_stopped (Optional[Union['Callback', Callable[..., Any]]], optional): A callback to run when/if the Job + is stopped. Defaults to None. Passing a callable is deprecated. Raises: TypeError: If `args` is not a tuple/list TypeError: If `kwargs` is not a dict TypeError: If the `func` is something other than a string or a Callable reference - ValueError: If `on_failure` is not a function - ValueError: If `on_success` is not a function - ValueError: If `on_stopped` is not a function + ValueError: If `on_failure` is not a Callback or function or string + ValueError: If `on_success` is not a Callback or function or string + ValueError: If `on_stopped` is not a Callback or function or string Returns: Job: A job instance. @@ -248,7 +248,8 @@ class Job: if on_success: if not isinstance(on_success, Callback): warnings.warn( - 'Passing a `Callable` `on_success` is deprecated, pass `Callback` instead', DeprecationWarning + 'Passing a string or function for `on_success` is deprecated, pass `Callback` instead', + DeprecationWarning, ) on_success = Callback(on_success) # backward compatibility job._success_callback_name = on_success.name @@ -257,7 +258,8 @@ class Job: if on_failure: if not isinstance(on_failure, Callback): warnings.warn( - 'Passing a `Callable` `on_failure` is deprecated, pass `Callback` instead', DeprecationWarning + 'Passing a string or function for `on_failure` is deprecated, pass `Callback` instead', + DeprecationWarning, ) on_failure = Callback(on_failure) # backward compatibility job._failure_callback_name = on_failure.name @@ -266,7 +268,8 @@ class Job: if on_stopped: if not isinstance(on_stopped, Callback): warnings.warn( - 'Passing a `Callable` `on_stopped` is deprecated, pass `Callback` instead', DeprecationWarning + 'Passing a string or function for `on_stopped` is deprecated, pass `Callback` instead', + DeprecationWarning, ) on_stopped = Callback(on_stopped) # backward compatibility job._stopped_callback_name = on_stopped.name @@ -1640,13 +1643,15 @@ class Retry: class Callback: - def __init__(self, func: Callable[..., Any], timeout: Optional[Any] = None): - if not inspect.isfunction(func) and not inspect.isbuiltin(func): - raise ValueError('Callback func must be a function') + def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[Any] = None): + if not isinstance(func, str) and not inspect.isfunction(func) and not inspect.isbuiltin(func): + raise ValueError('Callback `func` must be a string or function') self.func = func self.timeout = parse_timeout(timeout) if timeout else CALLBACK_TIMEOUT @property def name(self) -> str: + if isinstance(self.func, str): + return self.func return '{0}.{1}'.format(self.func.__module__, self.func.__qualname__) diff --git a/rq/queue.py b/rq/queue.py index 381e2cd..e0e48c5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -22,7 +22,7 @@ from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .dependency import Dependency from .exceptions import DequeueTimeout, NoSuchJobError -from .job import Job, JobStatus +from .job import Callback, Job, JobStatus from .logutils import blue, green from .serializers import resolve_serializer from .types import FunctionReferenceType, JobDependencyType @@ -515,9 +515,9 @@ class Queue: status: JobStatus = JobStatus.QUEUED, retry: Optional['Retry'] = None, *, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, - on_stopped: Optional[Callable] = None, + on_success: Optional[Union[Callback, Callable]] = None, + on_failure: Optional[Union[Callback, Callable]] = None, + on_stopped: Optional[Union[Callback, Callable]] = None, ) -> Job: """Creates a job based on parameters given @@ -535,9 +535,13 @@ class Queue: meta (Optional[Dict], optional): Job metadata. Defaults to None. status (JobStatus, optional): Job status. Defaults to JobStatus.QUEUED. retry (Optional[Retry], optional): The Retry Object. Defaults to None. - on_success (Optional[Callable], optional): On success callable. Defaults to None. - on_failure (Optional[Callable], optional): On failure callable. Defaults to None. - on_stopped (Optional[Callable], optional): On stopped callable. Defaults to None. + on_success (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on success. Defaults to + None. Callable is deprecated. + on_failure (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on failure. Defaults to + None. Callable is deprecated. + on_stopped (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on stopped. Defaults to + None. Callable is deprecated. + pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. Raises: ValueError: If the timeout is 0 @@ -659,9 +663,9 @@ class Queue: at_front: bool = False, meta: Optional[Dict] = None, retry: Optional['Retry'] = None, - on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None, - on_stopped: Optional[Callable[..., Any]] = None, + on_success: Optional[Union[Callback, Callable[..., Any]]] = None, + on_failure: Optional[Union[Callback, Callable[..., Any]]] = None, + on_stopped: Optional[Union[Callback, Callable[..., Any]]] = None, pipeline: Optional['Pipeline'] = None, ) -> Job: """Creates a job to represent the delayed function call and enqueues it. @@ -684,9 +688,12 @@ class Queue: at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False. meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None. retry (Optional[Retry], optional): Retry object. Defaults to None. - on_success (Optional[Callable[..., Any]], optional): Callable for on success. Defaults to None. - on_failure (Optional[Callable[..., Any]], optional): Callable for on failure. Defaults to None. - on_stopped (Optional[Callable[..., Any]], optional): Callable for on stopped. Defaults to None. + on_success (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on success. Defaults to + None. Callable is deprecated. + on_failure (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on failure. Defaults to + None. Callable is deprecated. + on_stopped (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on stopped. Defaults to + None. Callable is deprecated. pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. Returns: @@ -728,9 +735,9 @@ class Queue: at_front: bool = False, meta: Optional[Dict] = None, retry: Optional['Retry'] = None, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, - on_stopped: Optional[Callable] = None, + on_success: Optional[Union[Callback, Callable]] = None, + on_failure: Optional[Union[Callback, Callable]] = None, + on_stopped: Optional[Union[Callback, Callable]] = None, ) -> EnqueueData: """Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples And can keep this logic within EnqueueData @@ -749,9 +756,12 @@ class Queue: at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False. meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None. retry (Optional[Retry], optional): Retry object. Defaults to None. - on_success (Optional[Callable[..., Any]], optional): Callable for on success. Defaults to None. - on_failure (Optional[Callable[..., Any]], optional): Callable for on failure. Defaults to None. - on_stopped (Optional[Callable[..., Any]], optional): Callable for on stopped. Defaults to None. + on_success (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on success. Defaults to + None. Callable is deprecated. + on_failure (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on failure. Defaults to + None. Callable is deprecated. + on_stopped (Optional[Union[Callback, Callable[..., Any]]], optional): Callback for on stopped. Defaults to + None. Callable is deprecated. Returns: EnqueueData: The EnqueueData diff --git a/rq/utils.py b/rq/utils.py index 5e61983..8c34139 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -102,7 +102,11 @@ def import_attribute(name: str) -> Callable[..., Any]: attribute_bits.insert(0, module_name_bits.pop()) if module is None: - raise ValueError('Invalid attribute name: %s' % name) + # maybe it's a builtin + try: + return __builtins__[name] + except KeyError: + raise ValueError('Invalid attribute name: %s' % name) attribute_name = '.'.join(attribute_bits) if hasattr(module, attribute_name): diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index 6ceaedb..8aa9ad4 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -1,7 +1,7 @@ from datetime import timedelta from rq import Queue, Worker -from rq.job import UNEVALUATED, Job, JobStatus +from rq.job import UNEVALUATED, Callback, Job, JobStatus from rq.serializers import JSONSerializer from rq.worker import SimpleWorker from tests import RQTestCase @@ -35,6 +35,17 @@ class QueueCallbackTestCase(RQTestCase): job = Job.fetch(id=job.id, connection=self.testconn) self.assertEqual(job.success_callback, print) + # test string callbacks + job = queue.enqueue(say_hello, on_success=Callback("print")) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.success_callback, print) + + job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_success=Callback("print")) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.success_callback, print) + def test_enqueue_with_failure_callback(self): """queue.enqueue* methods with on_failure is persisted correctly""" queue = Queue(connection=self.testconn) @@ -53,6 +64,17 @@ class QueueCallbackTestCase(RQTestCase): job = Job.fetch(id=job.id, connection=self.testconn) self.assertEqual(job.failure_callback, print) + # test string callbacks + job = queue.enqueue(say_hello, on_failure=Callback("print")) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.failure_callback, print) + + job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_failure=Callback("print")) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.failure_callback, print) + def test_enqueue_with_stopped_callback(self): """queue.enqueue* methods with on_stopped is persisted correctly""" queue = Queue(connection=self.testconn) @@ -71,6 +93,17 @@ class QueueCallbackTestCase(RQTestCase): job = Job.fetch(id=job.id, connection=self.testconn) self.assertEqual(job.stopped_callback, print) + # test string callbacks + job = queue.enqueue(long_process, on_stopped=Callback("print")) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.stopped_callback, print) + + job = queue.enqueue_in(timedelta(seconds=10), long_process, on_stopped=Callback("print")) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.stopped_callback, print) + class SyncJobCallback(RQTestCase): def test_success_callback(self): @@ -85,6 +118,15 @@ class SyncJobCallback(RQTestCase): self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertFalse(self.testconn.exists('success_callback:%s' % job.id)) + # test string callbacks + job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.save_result")) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.result) + + job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result")) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertFalse(self.testconn.exists('success_callback:%s' % job.id)) + def test_failure_callback(self): """queue.enqueue* methods with on_failure is persisted correctly""" queue = Queue(is_async=False) @@ -97,17 +139,34 @@ class SyncJobCallback(RQTestCase): self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id)) + # test string callbacks + job = queue.enqueue(div_by_zero, on_failure=Callback("tests.fixtures.save_exception")) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode()) + + job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result")) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id)) + def test_stopped_callback(self): """queue.enqueue* methods with on_stopped is persisted correctly""" connection = self.testconn queue = Queue('foo', connection=connection, serializer=JSONSerializer) worker = SimpleWorker('foo', connection=connection, serializer=JSONSerializer) + job = queue.enqueue(long_process, on_stopped=save_result_if_not_stopped) job.execute_stopped_callback( worker.death_penalty_class ) # Calling execute_stopped_callback directly for coverage self.assertTrue(self.testconn.exists('stopped_callback:%s' % job.id)) + # test string callbacks + job = queue.enqueue(long_process, on_stopped=Callback("tests.fixtures.save_result_if_not_stopped")) + job.execute_stopped_callback( + worker.death_penalty_class + ) # Calling execute_stopped_callback directly for coverage + self.assertTrue(self.testconn.exists('stopped_callback:%s' % job.id)) + class WorkerCallbackTestCase(RQTestCase): def test_success_callback(self): @@ -115,9 +174,8 @@ class WorkerCallbackTestCase(RQTestCase): queue = Queue(connection=self.testconn) worker = SimpleWorker([queue]) - job = queue.enqueue(say_hello, on_success=save_result) - # Callback is executed when job is successfully executed + job = queue.enqueue(say_hello, on_success=save_result) worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FINISHED) self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.return_value()) @@ -127,6 +185,17 @@ class WorkerCallbackTestCase(RQTestCase): self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertFalse(self.testconn.exists('success_callback:%s' % job.id)) + # test string callbacks + job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.save_result")) + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.return_value()) + + job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result")) + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertFalse(self.testconn.exists('success_callback:%s' % job.id)) + def test_erroneous_success_callback(self): """Test exception handling when executing success callback""" queue = Queue(connection=self.testconn) @@ -137,14 +206,18 @@ class WorkerCallbackTestCase(RQTestCase): worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FAILED) + # test string callbacks + job = queue.enqueue(say_hello, on_success=Callback("tests.fixtures.erroneous_callback")) + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FAILED) + def test_failure_callback(self): """Test failure callback is executed only when job a fails""" queue = Queue(connection=self.testconn) worker = SimpleWorker([queue]) - job = queue.enqueue(div_by_zero, on_failure=save_exception) - # Callback is executed when job is successfully executed + job = queue.enqueue(div_by_zero, on_failure=save_exception) worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FAILED) job.refresh() @@ -156,6 +229,19 @@ class WorkerCallbackTestCase(RQTestCase): self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id)) + # test string callbacks + job = queue.enqueue(div_by_zero, on_failure=Callback("tests.fixtures.save_exception")) + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FAILED) + job.refresh() + print(job.exc_info) + self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode()) + + job = queue.enqueue(div_by_zero, on_success=Callback("tests.fixtures.save_result")) + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id)) + # TODO: add test case for error while executing failure callback @@ -179,6 +265,15 @@ class JobCallbackTestCase(RQTestCase): job = Job.fetch(id=job.id, connection=self.testconn) self.assertEqual(job.success_callback, print) + # test string callbacks + job = Job.create(say_hello, on_success=Callback("print")) + self.assertIsNotNone(job._success_callback_name) + self.assertEqual(job.success_callback, print) + job.save() + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.success_callback, print) + def test_job_creation_with_failure_callback(self): """Ensure failure callbacks are persisted properly""" job = Job.create(say_hello) @@ -198,6 +293,15 @@ class JobCallbackTestCase(RQTestCase): job = Job.fetch(id=job.id, connection=self.testconn) self.assertEqual(job.failure_callback, print) + # test string callbacks + job = Job.create(say_hello, on_failure=Callback("print")) + self.assertIsNotNone(job._failure_callback_name) + self.assertEqual(job.failure_callback, print) + job.save() + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.failure_callback, print) + def test_job_creation_with_stopped_callback(self): """Ensure stopped callbacks are persisted properly""" job = Job.create(say_hello) @@ -216,3 +320,12 @@ class JobCallbackTestCase(RQTestCase): job = Job.fetch(id=job.id, connection=self.testconn) self.assertEqual(job.stopped_callback, print) + + # test string callbacks + job = Job.create(say_hello, on_stopped=Callback("print")) + self.assertIsNotNone(job._stopped_callback_name) + self.assertEqual(job.stopped_callback, print) + job.save() + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.stopped_callback, print)