From 456743b225c0b5a4339242a5b714c378b86f2fe0 Mon Sep 17 00:00:00 2001 From: Josh Cohen Date: Sun, 20 Jun 2021 06:18:00 -0400 Subject: [PATCH] Make `Queue.enqueue`, `Queue.enqueue_call`, `Queue.enqueue_at``Queue.parse_args` accept `pipeline` arg, add `Queue.enqueue_many` method (#1485) * Make `enqueue_*` and `Queue.parse_args` accept `pipeline` arg * undo bad docs * Fix lints in new code * Implement enqueue_many, refactor dependency setup out to method * Make name consistant * Refactor enqueue_many, update tests, add docs * Refactor out enqueueing from dependency setup * Move new docs to queue page * Fix section header * Fix section header again * Update rq version to 1.9.0 --- docs/docs/index.md | 29 +++++++++ rq/queue.py | 155 ++++++++++++++++++++++++++++++++------------ rq/version.py | 2 +- tests/test_queue.py | 96 ++++++++++++++++++++++++++- 4 files changed, 240 insertions(+), 42 deletions(-) diff --git a/docs/docs/index.md b/docs/docs/index.md index d18f081..86b1d7f 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -102,6 +102,35 @@ q = Queue('low', connection=redis_conn) q.enqueue('my_package.my_module.my_func', 3, 4) ``` +### Bulk Job Enqueueing +_New in version 1.9.0._ +You can also enqueue multiple jobs in bulk with `queue.enqueue_many()` and `Queue.prepare_data()`: + +```python +jobs = q.enqueue_many( + [ + Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_job_id'), + Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_other_job_id'), + ] +) +``` + +which will enqueue all the jobs in a single redis `pipeline` which you can optionally pass in yourself: + +```python +with q.connection.pipeline() as pipe: + jobs = q.enqueue_many( + [ + Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_job_id'), + Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_other_job_id'), + ] + pipeline=pipe + ) + pipe.execute() +``` + +`Queue.prepare_data` accepts all arguments that `Queue.parse_args` does **EXCEPT** for `depends_on`, +which is not supported at this time, so dependencies will be up to you to setup. ## Working with Queues diff --git a/rq/queue.py b/rq/queue.py index cabcaf7..8890d27 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -5,6 +5,7 @@ from __future__ import (absolute_import, division, print_function, import uuid import warnings +from collections import namedtuple from datetime import datetime, timezone from distutils.version import StrictVersion @@ -23,6 +24,17 @@ def compact(lst): return [item for item in lst if item is not None] +class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout", + "result_ttl", "ttl", "failure_ttl", + "description", "job_id", + "at_front", "meta", "retry"])): + """Helper type to use when calling enqueue_many + NOTE: Does not support `depends_on` yet. + """ + + __slots__ = () + + @total_ordering class Queue: job_class = Job @@ -310,10 +322,56 @@ class Queue: return job + def setup_dependencies( + self, + job, + pipeline=None + ): + # If a _dependent_ job depends on any unfinished job, register all the + # _dependent_ job's dependencies instead of enqueueing it. + # + # `Job#fetch_dependencies` sets WATCH on all dependencies. If + # WatchError is raised in the when the pipeline is executed, that means + # something else has modified either the set of dependencies or the + # status of one of them. In this case, we simply retry. + if len(job._dependency_ids) > 0: + pipe = pipeline if pipeline is not None else self.connection.pipeline() + while True: + try: + # Also calling watch even if caller + # passed in a pipeline since Queue#create_job + # is called from within this method. + pipe.watch(job.dependencies_key) + + dependencies = job.fetch_dependencies( + watch=True, + pipeline=pipe + ) + + pipe.multi() + + for dependency in dependencies: + if dependency.get_status(refresh=False) != JobStatus.FINISHED: + job.set_status(JobStatus.DEFERRED, pipeline=pipe) + job.register_dependency(pipeline=pipe) + job.save(pipeline=pipe) + job.cleanup(ttl=job.ttl, pipeline=pipe) + if pipeline is None: + pipe.execute() + return job + break + except WatchError: + if pipeline is None: + continue + else: + # if pipeline comes from caller, re-raise to them + raise + return job + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, failure_ttl=None, description=None, depends_on=None, job_id=None, - at_front=False, meta=None, retry=None): + at_front=False, meta=None, retry=None, pipeline=None): """Creates a job to represent the delayed function call and enqueues it. nd @@ -329,42 +387,58 @@ nd retry=retry ) - # If a _dependent_ job depends on any unfinished job, register all the - # _dependent_ job's dependencies instead of enqueueing it. - # - # `Job#fetch_dependencies` sets WATCH on all dependencies. If - # WatchError is raised in the when the pipeline is executed, that means - # something else has modified either the set of dependencies or the - # status of one of them. In this case, we simply retry. - if depends_on is not None: - with self.connection.pipeline() as pipe: - while True: - try: - - pipe.watch(job.dependencies_key) - - dependencies = job.fetch_dependencies( - watch=True, - pipeline=pipe - ) - - pipe.multi() - - for dependency in dependencies: - if dependency.get_status(refresh=False) != JobStatus.FINISHED: - job.set_status(JobStatus.DEFERRED, pipeline=pipe) - job.register_dependency(pipeline=pipe) - job.save(pipeline=pipe) - job.cleanup(ttl=job.ttl, pipeline=pipe) - pipe.execute() - return job + job = self.setup_dependencies( + job, + pipeline=pipeline + ) + # If we do not depend on an unfinished job, enqueue the job. + if job.get_status(refresh=False) != JobStatus.DEFERRED: + return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) + return job - break - except WatchError: - continue + @staticmethod + def prepare_data(func, args=None, kwargs=None, timeout=None, + result_ttl=None, ttl=None, failure_ttl=None, + description=None, job_id=None, + at_front=False, meta=None, retry=None): + # Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples + # And can keep this logic within EnqueueData + return EnqueueData( + func, args, kwargs, timeout, + result_ttl, ttl, failure_ttl, + description, job_id, + at_front, meta, retry + ) - job = self.enqueue_job(job, at_front=at_front) - return job + def enqueue_many( + self, + job_datas, + pipeline=None + ): + """ + Creates multiple jobs (created via `Queue.prepare_data` calls) + to represent the delayed function calls and enqueues them. + """ + pipe = pipeline if pipeline is not None else self.connection.pipeline() + jobs = [ + self.enqueue_job( + self.create_job( + job_data.func, args=job_data.args, kwargs=job_data.kwargs, result_ttl=job_data.result_ttl, + ttl=job_data.ttl, + failure_ttl=job_data.failure_ttl, description=job_data.description, + depends_on=None, + job_id=job_data.job_id, meta=job_data.meta, status=JobStatus.QUEUED, + timeout=job_data.timeout, + retry=job_data.retry + ), + pipeline=pipe, + at_front=job_data.at_front + ) + for job_data in job_datas + ] + if pipeline is None: + pipe.execute() + return jobs def run_job(self, job): job.perform() @@ -401,6 +475,7 @@ nd at_front = kwargs.pop('at_front', False) meta = kwargs.pop('meta', None) retry = kwargs.pop('retry', None) + pipeline = kwargs.pop('pipeline', None) if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa @@ -408,32 +483,32 @@ nd kwargs = kwargs.pop('kwargs', None) return (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, args, kwargs) + depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues it.""" (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) return self.enqueue_call( func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, description=description, depends_on=depends_on, job_id=job_id, - at_front=at_front, meta=meta, retry=retry + at_front=at_front, meta=meta, retry=retry, pipeline=pipeline ) def enqueue_at(self, datetime, f, *args, **kwargs): """Schedules a job to be enqueued at specified time""" (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, description=description, depends_on=depends_on, job_id=job_id, meta=meta, retry=retry) - return self.schedule_job(job, datetime) + return self.schedule_job(job, datetime, pipeline=pipeline) def schedule_job(self, job, datetime, pipeline=None): """Puts job on ScheduledJobRegistry""" diff --git a/rq/version.py b/rq/version.py index 2658962..92b65c7 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '1.8.1' +VERSION = '1.9.0' diff --git a/tests/test_queue.py b/tests/test_queue.py index 9b61b5a..9d00545 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -35,6 +35,7 @@ class MultipleDependencyJob(Job): _job._dependency_ids = dependency_ids return _job + class TestQueue(RQTestCase): def test_create_queue(self): """Creating queues.""" @@ -507,6 +508,99 @@ class TestQueue(RQTestCase): self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.get_status(), JobStatus.QUEUED) + def test_enqueue_job_with_dependency_and_pipeline(self): + """Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline.""" + # Job with unfinished dependency is not immediately enqueued + parent_job = Job.create(func=say_hello) + parent_job.save() + q = Queue() + with q.connection.pipeline() as pipe: + job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe) + self.assertEqual(q.job_ids, []) + self.assertEqual(job.get_status(refresh=False), JobStatus.DEFERRED) + # Not in registry before execute, since passed in pipeline + self.assertEqual(len(q.deferred_job_registry), 0) + pipe.execute() + # Only in registry after execute, since passed in pipeline + self.assertEqual(len(q.deferred_job_registry), 1) + + # Jobs dependent on finished jobs are immediately enqueued + parent_job.set_status(JobStatus.FINISHED) + parent_job.save() + with q.connection.pipeline() as pipe: + job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe) + # Pre execute conditions + self.assertEqual(q.job_ids, []) + self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) + self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED) + pipe.execute() + # Post execute conditions + self.assertEqual(q.job_ids, [job.id]) + self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) + self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED) + + def test_enqueue_many_internal_pipeline(self): + """Jobs should be enqueued in bulk with an internal pipeline, enqueued in order provided + (but at_front still applies)""" + # Job with unfinished dependency is not immediately enqueued + q = Queue() + job_1_data = Queue.prepare_data( + say_hello, + job_id='fake_job_id_1', + at_front=False + ) + job_2_data = Queue.prepare_data( + say_hello, + job_id='fake_job_id_2', + at_front=False + ) + job_3_data = Queue.prepare_data( + say_hello, + job_id='fake_job_id_3', + at_front=True + ) + jobs = q.enqueue_many( + [job_1_data, job_2_data, job_3_data], + ) + for job in jobs: + self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED) + # Only in registry after execute, since passed in pipeline + self.assertEqual(len(q), 3) + self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2']) + + def test_enqueue_many_with_passed_pipeline(self): + """Jobs should be enqueued in bulk with a passed pipeline, enqueued in order provided + (but at_front still applies)""" + # Job with unfinished dependency is not immediately enqueued + q = Queue() + with q.connection.pipeline() as pipe: + job_1_data = Queue.prepare_data( + say_hello, + job_id='fake_job_id_1', + at_front=False + ) + job_2_data = Queue.prepare_data( + say_hello, + job_id='fake_job_id_2', + at_front=False + ) + job_3_data = Queue.prepare_data( + say_hello, + job_id='fake_job_id_3', + at_front=True + ) + jobs = q.enqueue_many( + [job_1_data, job_2_data, job_3_data], + pipeline=pipe + ) + self.assertEqual(q.job_ids, []) + for job in jobs: + self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED) + pipe.execute() + # Only in registry after execute, since passed in pipeline + self.assertEqual(len(q), 3) + self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2']) + def test_enqueue_job_with_dependency_by_id(self): """Can specify job dependency with job object or job id.""" parent_job = Job.create(func=say_hello) @@ -653,7 +747,7 @@ class TestQueue(RQTestCase): self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue)) self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue)) self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue)) - + def test_enqueue_with_retry(self): """Enqueueing with retry_strategy works""" queue = Queue('example', connection=self.testconn)