Add support for dependent jobs in enqueue_many (#1897)

* Add support for dependent jobs in enqueue_many

* Add module to register dependencies for multiple jobs

The get_ready_jobs function will process dependencies for an array of
jobs passed in. If any jobs' dependencies are already met, those jobs
are returned so they can be enqueued.

* Add check for jobs without dependencies

* Remove extra colon in dependencies key

This seems like a bug, but if I'm mistaken please let me know.

* Add bulk deferred jobs to Redis

Need to call queue.enqueue_job to create the job hash in redis. Since all of
these jobs are deferred, they won't be added to the queue and processed
by a worker.

* Revert "Remove extra colon in dependencies key"

This reverts commit 5ebf7a35009fcca410c43b9327203915ddfd0628.

* Enqueue jobs without dependencies separately

Any jobs without dependencies will be enqueued before handling

* Fix enqueue_many return value

* Rename ready_jobs function

* Fix enqueue_many return value

* Instantiate job category arrays before if statement

* Execute pipe to enqueue jobs with met dependencies

* Add tests for enqueue_many with dependencies

* Change dependency sorting function name

* Use common kwargs dict to create jobs

* Remove redundant tests for dependent jobs

* Alphebetize imports

* Test job with met dependencies using enqueue_many

* Fix typo

* Format with black

* Sort imports
main
Ethan Wolinsky 2 years ago committed by GitHub
parent ffacdcb675
commit b756cf82bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,27 @@
from typing import List
from redis.client import Pipeline
from redis.exceptions import WatchError
from .job import Job
class Dependency:
@classmethod
def get_jobs_with_met_dependencies(cls, jobs: List['Job'], pipeline: Pipeline):
jobs_with_met_dependencies = []
jobs_with_unmet_dependencies = []
for job in jobs:
while True:
try:
pipeline.watch(*[Job.key_for(dependency_id) for dependency_id in job._dependency_ids])
job.register_dependency(pipeline=pipeline)
if job.dependencies_are_met(pipeline=pipeline):
jobs_with_met_dependencies.append(job)
else:
jobs_with_unmet_dependencies.append(job)
pipeline.execute()
except WatchError:
continue
break
return jobs_with_met_dependencies, jobs_with_unmet_dependencies

@ -20,6 +20,7 @@ if TYPE_CHECKING:
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .dependency import Dependency
from .exceptions import DequeueTimeout, NoSuchJobError
from .job import Job, JobStatus
from .logutils import blue, green
@ -42,6 +43,7 @@ class EnqueueData(
"ttl",
"failure_ttl",
"description",
"depends_on",
"job_id",
"at_front",
"meta",
@ -714,6 +716,7 @@ class Queue:
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional[List] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
@ -733,6 +736,7 @@ class Queue:
ttl (Optional[int], optional): Time to live. Defaults to None.
failure_ttl (Optional[int], optional): Failure time to live. Defaults to None.
description (Optional[str], optional): The job description. Defaults to None.
depends_on (Optional[JobDependencyType], optional): The job dependencies. Defaults to None.
job_id (Optional[str], optional): The job ID. Defaults to None.
at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False.
meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None.
@ -752,6 +756,7 @@ class Queue:
ttl,
failure_ttl,
description,
depends_on,
job_id,
at_front,
meta,
@ -772,33 +777,66 @@ class Queue:
List[Job]: A list of enqueued jobs
"""
pipe = pipeline if pipeline is not None else self.connection.pipeline()
jobs = [
jobs_without_dependencies = []
jobs_with_unmet_dependencies = []
jobs_with_met_dependencies = []
def get_job_kwargs(job_data, initial_status):
return {
"func": job_data.func,
"args": job_data.args,
"kwargs": job_data.kwargs,
"result_ttl": job_data.result_ttl,
"ttl": job_data.ttl,
"failure_ttl": job_data.failure_ttl,
"description": job_data.description,
"depends_on": job_data.depends_on,
"job_id": job_data.job_id,
"meta": job_data.meta,
"status": initial_status,
"timeout": job_data.timeout,
"retry": job_data.retry,
"on_success": job_data.on_success,
"on_failure": job_data.on_failure,
}
# Enqueue jobs without dependencies
job_datas_without_dependencies = [job_data for job_data in job_datas if not job_data.depends_on]
if job_datas_without_dependencies:
jobs_without_dependencies = [
self._enqueue_job(
self.create_job(
job_data.func,
args=job_data.args,
kwargs=job_data.kwargs,
result_ttl=job_data.result_ttl,
ttl=job_data.ttl,
failure_ttl=job_data.failure_ttl,
description=job_data.description,
depends_on=None,
job_id=job_data.job_id,
meta=job_data.meta,
status=JobStatus.QUEUED,
timeout=job_data.timeout,
retry=job_data.retry,
on_success=job_data.on_success,
on_failure=job_data.on_failure,
),
self.create_job(**get_job_kwargs(job_data, JobStatus.QUEUED)),
pipeline=pipe,
at_front=job_data.at_front,
)
for job_data in job_datas
for job_data in job_datas_without_dependencies
]
if pipeline is None:
pipe.execute()
job_datas_with_dependencies = [job_data for job_data in job_datas if job_data.depends_on]
if job_datas_with_dependencies:
# Save all jobs with dependencies as deferred
jobs_with_dependencies = [
self.create_job(**get_job_kwargs(job_data, JobStatus.DEFERRED))
for job_data in job_datas_with_dependencies
]
for job in jobs_with_dependencies:
job.save(pipeline=pipe)
if pipeline is None:
pipe.execute()
# Enqueue the jobs whose dependencies have been met
jobs_with_met_dependencies, jobs_with_unmet_dependencies = Dependency.get_jobs_with_met_dependencies(
jobs_with_dependencies, pipeline=pipe
)
jobs_with_met_dependencies = [
self._enqueue_job(job, pipeline=pipe, at_front=job.enqueue_at_front)
for job in jobs_with_met_dependencies
]
if pipeline is None:
pipe.execute()
return jobs
return jobs_without_dependencies + jobs_with_unmet_dependencies + jobs_with_met_dependencies
def run_job(self, job: 'Job') -> Job:
"""Run the job

@ -103,6 +103,26 @@ class TestDependencies(RQTestCase):
self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"])
def test_multiple_jobs_with_dependencies(self):
"""Enqueue dependent jobs only when appropriate"""
q = Queue(connection=self.testconn)
w = SimpleWorker([q], connection=q.connection)
# Multiple jobs are enqueued with correct status
parent_job = q.enqueue(say_hello)
job_no_deps = Queue.prepare_data(say_hello)
job_with_deps = Queue.prepare_data(say_hello, depends_on=parent_job)
jobs = q.enqueue_many([job_no_deps, job_with_deps])
self.assertEqual(jobs[0].get_status(), JobStatus.QUEUED)
self.assertEqual(jobs[1].get_status(), JobStatus.DEFERRED)
w.work(burst=True, max_jobs=1)
self.assertEqual(jobs[1].get_status(), JobStatus.QUEUED)
job_with_met_deps = Queue.prepare_data(say_hello, depends_on=parent_job)
jobs = q.enqueue_many([job_with_met_deps])
self.assertEqual(jobs[0].get_status(), JobStatus.QUEUED)
q.empty()
def test_dependency_list_in_depends_on(self):
"""Enqueue with Dependency list in depends_on"""
q = Queue(connection=self.testconn)

Loading…
Cancel
Save