Make `Queue.enqueue`, `Queue.enqueue_call`, `Queue.enqueue_at``Queue.parse_args` accept `pipeline` arg, add `Queue.enqueue_many` method (#1485)

* Make `enqueue_*` and `Queue.parse_args` accept `pipeline` arg

* undo bad docs

* Fix lints in new code

* Implement enqueue_many, refactor dependency setup out to method

* Make name consistant

* Refactor enqueue_many, update tests,  add docs

* Refactor out enqueueing from dependency setup

* Move new docs to queue page

* Fix section header

* Fix section header again

* Update rq version to 1.9.0
main
Josh Cohen 4 years ago committed by GitHub
parent 35604f9bb1
commit 456743b225
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -102,6 +102,35 @@ q = Queue('low', connection=redis_conn)
q.enqueue('my_package.my_module.my_func', 3, 4) q.enqueue('my_package.my_module.my_func', 3, 4)
``` ```
### Bulk Job Enqueueing
_New in version 1.9.0._
You can also enqueue multiple jobs in bulk with `queue.enqueue_many()` and `Queue.prepare_data()`:
```python
jobs = q.enqueue_many(
[
Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_job_id'),
Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_other_job_id'),
]
)
```
which will enqueue all the jobs in a single redis `pipeline` which you can optionally pass in yourself:
```python
with q.connection.pipeline() as pipe:
jobs = q.enqueue_many(
[
Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_job_id'),
Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_other_job_id'),
]
pipeline=pipe
)
pipe.execute()
```
`Queue.prepare_data` accepts all arguments that `Queue.parse_args` does **EXCEPT** for `depends_on`,
which is not supported at this time, so dependencies will be up to you to setup.
## Working with Queues ## Working with Queues

@ -5,6 +5,7 @@ from __future__ import (absolute_import, division, print_function,
import uuid import uuid
import warnings import warnings
from collections import namedtuple
from datetime import datetime, timezone from datetime import datetime, timezone
from distutils.version import StrictVersion from distutils.version import StrictVersion
@ -23,6 +24,17 @@ def compact(lst):
return [item for item in lst if item is not None] return [item for item in lst if item is not None]
class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout",
"result_ttl", "ttl", "failure_ttl",
"description", "job_id",
"at_front", "meta", "retry"])):
"""Helper type to use when calling enqueue_many
NOTE: Does not support `depends_on` yet.
"""
__slots__ = ()
@total_ordering @total_ordering
class Queue: class Queue:
job_class = Job job_class = Job
@ -310,10 +322,56 @@ class Queue:
return job return job
def setup_dependencies(
self,
job,
pipeline=None
):
# If a _dependent_ job depends on any unfinished job, register all the
# _dependent_ job's dependencies instead of enqueueing it.
#
# `Job#fetch_dependencies` sets WATCH on all dependencies. If
# WatchError is raised in the when the pipeline is executed, that means
# something else has modified either the set of dependencies or the
# status of one of them. In this case, we simply retry.
if len(job._dependency_ids) > 0:
pipe = pipeline if pipeline is not None else self.connection.pipeline()
while True:
try:
# Also calling watch even if caller
# passed in a pipeline since Queue#create_job
# is called from within this method.
pipe.watch(job.dependencies_key)
dependencies = job.fetch_dependencies(
watch=True,
pipeline=pipe
)
pipe.multi()
for dependency in dependencies:
if dependency.get_status(refresh=False) != JobStatus.FINISHED:
job.set_status(JobStatus.DEFERRED, pipeline=pipe)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe)
if pipeline is None:
pipe.execute()
return job
break
except WatchError:
if pipeline is None:
continue
else:
# if pipeline comes from caller, re-raise to them
raise
return job
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None, result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None, description=None, depends_on=None, job_id=None,
at_front=False, meta=None, retry=None): at_front=False, meta=None, retry=None, pipeline=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
nd nd
@ -329,42 +387,58 @@ nd
retry=retry retry=retry
) )
# If a _dependent_ job depends on any unfinished job, register all the job = self.setup_dependencies(
# _dependent_ job's dependencies instead of enqueueing it. job,
# pipeline=pipeline
# `Job#fetch_dependencies` sets WATCH on all dependencies. If )
# WatchError is raised in the when the pipeline is executed, that means # If we do not depend on an unfinished job, enqueue the job.
# something else has modified either the set of dependencies or the if job.get_status(refresh=False) != JobStatus.DEFERRED:
# status of one of them. In this case, we simply retry. return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
if depends_on is not None: return job
with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(job.dependencies_key)
dependencies = job.fetch_dependencies(
watch=True,
pipeline=pipe
)
pipe.multi()
for dependency in dependencies:
if dependency.get_status(refresh=False) != JobStatus.FINISHED:
job.set_status(JobStatus.DEFERRED, pipeline=pipe)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe)
pipe.execute()
return job
break @staticmethod
except WatchError: def prepare_data(func, args=None, kwargs=None, timeout=None,
continue result_ttl=None, ttl=None, failure_ttl=None,
description=None, job_id=None,
at_front=False, meta=None, retry=None):
# Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
# And can keep this logic within EnqueueData
return EnqueueData(
func, args, kwargs, timeout,
result_ttl, ttl, failure_ttl,
description, job_id,
at_front, meta, retry
)
job = self.enqueue_job(job, at_front=at_front) def enqueue_many(
return job self,
job_datas,
pipeline=None
):
"""
Creates multiple jobs (created via `Queue.prepare_data` calls)
to represent the delayed function calls and enqueues them.
"""
pipe = pipeline if pipeline is not None else self.connection.pipeline()
jobs = [
self.enqueue_job(
self.create_job(
job_data.func, args=job_data.args, kwargs=job_data.kwargs, result_ttl=job_data.result_ttl,
ttl=job_data.ttl,
failure_ttl=job_data.failure_ttl, description=job_data.description,
depends_on=None,
job_id=job_data.job_id, meta=job_data.meta, status=JobStatus.QUEUED,
timeout=job_data.timeout,
retry=job_data.retry
),
pipeline=pipe,
at_front=job_data.at_front
)
for job_data in job_datas
]
if pipeline is None:
pipe.execute()
return jobs
def run_job(self, job): def run_job(self, job):
job.perform() job.perform()
@ -401,6 +475,7 @@ nd
at_front = kwargs.pop('at_front', False) at_front = kwargs.pop('at_front', False)
meta = kwargs.pop('meta', None) meta = kwargs.pop('meta', None)
retry = kwargs.pop('retry', None) retry = kwargs.pop('retry', None)
pipeline = kwargs.pop('pipeline', None)
if 'args' in kwargs or 'kwargs' in kwargs: if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs' # noqa
@ -408,32 +483,32 @@ nd
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return (f, timeout, description, result_ttl, ttl, failure_ttl, return (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, retry, args, kwargs) depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues it.""" """Creates a job to represent the delayed function call and enqueues it."""
(f, timeout, description, result_ttl, ttl, failure_ttl, (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs) depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
return self.enqueue_call( return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout, func=f, args=args, kwargs=kwargs, timeout=timeout,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
description=description, depends_on=depends_on, job_id=job_id, description=description, depends_on=depends_on, job_id=job_id,
at_front=at_front, meta=meta, retry=retry at_front=at_front, meta=meta, retry=retry, pipeline=pipeline
) )
def enqueue_at(self, datetime, f, *args, **kwargs): def enqueue_at(self, datetime, f, *args, **kwargs):
"""Schedules a job to be enqueued at specified time""" """Schedules a job to be enqueued at specified time"""
(f, timeout, description, result_ttl, ttl, failure_ttl, (f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, retry, args, kwargs) = Queue.parse_args(f, *args, **kwargs) depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, ttl=ttl, timeout=timeout, result_ttl=result_ttl, ttl=ttl,
failure_ttl=failure_ttl, description=description, failure_ttl=failure_ttl, description=description,
depends_on=depends_on, job_id=job_id, meta=meta, retry=retry) depends_on=depends_on, job_id=job_id, meta=meta, retry=retry)
return self.schedule_job(job, datetime) return self.schedule_job(job, datetime, pipeline=pipeline)
def schedule_job(self, job, datetime, pipeline=None): def schedule_job(self, job, datetime, pipeline=None):
"""Puts job on ScheduledJobRegistry""" """Puts job on ScheduledJobRegistry"""

@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
VERSION = '1.8.1' VERSION = '1.9.0'

@ -35,6 +35,7 @@ class MultipleDependencyJob(Job):
_job._dependency_ids = dependency_ids _job._dependency_ids = dependency_ids
return _job return _job
class TestQueue(RQTestCase): class TestQueue(RQTestCase):
def test_create_queue(self): def test_create_queue(self):
"""Creating queues.""" """Creating queues."""
@ -507,6 +508,99 @@ class TestQueue(RQTestCase):
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(), JobStatus.QUEUED) self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_job_with_dependency_and_pipeline(self):
"""Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline."""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue()
with q.connection.pipeline() as pipe:
job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(refresh=False), JobStatus.DEFERRED)
# Not in registry before execute, since passed in pipeline
self.assertEqual(len(q.deferred_job_registry), 0)
pipe.execute()
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q.deferred_job_registry), 1)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
with q.connection.pipeline() as pipe:
job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe)
# Pre execute conditions
self.assertEqual(q.job_ids, [])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
pipe.execute()
# Post execute conditions
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
def test_enqueue_many_internal_pipeline(self):
"""Jobs should be enqueued in bulk with an internal pipeline, enqueued in order provided
(but at_front still applies)"""
# Job with unfinished dependency is not immediately enqueued
q = Queue()
job_1_data = Queue.prepare_data(
say_hello,
job_id='fake_job_id_1',
at_front=False
)
job_2_data = Queue.prepare_data(
say_hello,
job_id='fake_job_id_2',
at_front=False
)
job_3_data = Queue.prepare_data(
say_hello,
job_id='fake_job_id_3',
at_front=True
)
jobs = q.enqueue_many(
[job_1_data, job_2_data, job_3_data],
)
for job in jobs:
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q), 3)
self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2'])
def test_enqueue_many_with_passed_pipeline(self):
"""Jobs should be enqueued in bulk with a passed pipeline, enqueued in order provided
(but at_front still applies)"""
# Job with unfinished dependency is not immediately enqueued
q = Queue()
with q.connection.pipeline() as pipe:
job_1_data = Queue.prepare_data(
say_hello,
job_id='fake_job_id_1',
at_front=False
)
job_2_data = Queue.prepare_data(
say_hello,
job_id='fake_job_id_2',
at_front=False
)
job_3_data = Queue.prepare_data(
say_hello,
job_id='fake_job_id_3',
at_front=True
)
jobs = q.enqueue_many(
[job_1_data, job_2_data, job_3_data],
pipeline=pipe
)
self.assertEqual(q.job_ids, [])
for job in jobs:
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
pipe.execute()
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q), 3)
self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2'])
def test_enqueue_job_with_dependency_by_id(self): def test_enqueue_job_with_dependency_by_id(self):
"""Can specify job dependency with job object or job id.""" """Can specify job dependency with job object or job id."""
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
@ -653,7 +747,7 @@ class TestQueue(RQTestCase):
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue)) self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue)) self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue)) self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
def test_enqueue_with_retry(self): def test_enqueue_with_retry(self):
"""Enqueueing with retry_strategy works""" """Enqueueing with retry_strategy works"""
queue = Queue('example', connection=self.testconn) queue = Queue('example', connection=self.testconn)

Loading…
Cancel
Save