Add feature to enqueue dependents at the front of queues (#1696)

* Add feature to enqueue dependents at the front of queues

* Add documentation for the Dependency(enqueue_at_front=...) parameter

* docs: Add `enqueue_at_front` to list of parameters for Dependency

* test: Update dependency test to not rely on Redis ordering

* refactor: Save enqueue_at_front boolean in job.meta instead of separate instance attr

* fix: Made enqueue_at_front an instance attribute instead of putting it inside meta
main
Jahn Thomas Fidje 2 years ago committed by GitHub
parent 108c2ea666
commit 8404385592
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -167,6 +167,7 @@ The `Dependency(jobs=...)` parameter accepts:
- a string representing a single job id - a string representing a single job id
- a Job object - a Job object
- an iteratable of job id strings and/or Job objects - an iteratable of job id strings and/or Job objects
- `enqueue_at_front` boolean parameter to put dependents at the front when they are enqueued
Example: Example:
@ -177,9 +178,17 @@ from rq import Queue
queue = Queue(connection=Redis()) queue = Queue(connection=Redis())
job_1 = queue.enqueue(div_by_zero) job_1 = queue.enqueue(div_by_zero)
dependency = Dependency(jobs=[job_1], allow_failure=True) # allow_failure defaults to False dependency = Dependency(
jobs=[job_1],
allow_failure=True, # allow_failure defaults to False
enqueue_at_front=True # enqueue_at_front defaults to False
)
job_2 = queue.enqueue(say_hello, depends_on=dependency) job_2 = queue.enqueue(say_hello, depends_on=dependency)
# job_2 will execute even though its dependency (job_1) fails
"""
job_2 will execute even though its dependency (job_1) fails,
and it will be enqueued at the front of the queue.
"""
``` ```
@ -269,10 +278,10 @@ There are two options:
#### Arguments: #### Arguments:
| | plain text | json | [literal-eval](https://docs.python.org/3/library/ast.html#ast.literal_eval) | | | plain text | json | [literal-eval](https://docs.python.org/3/library/ast.html#ast.literal_eval) |
|-|-|-|-| | ---------- | --------------- | ---------------- | --------------------------------------------------------------------------- |
| keyword | `[key]=[value]` | `[key]:=[value]` | `[key]%=[value]` | | keyword | `[key]=[value]` | `[key]:=[value]` | `[key]%=[value]` |
| no keyword | `[value]` | `:[value]` | `%[value]` | | no keyword | `[value]` | `:[value]` | `%[value]` |
Where `[key]` is the keyword and `[value]` is the value which is parsed with the corresponding Where `[key]` is the keyword and `[value]` is the value which is parsed with the corresponding
parsing method. parsing method.

@ -39,7 +39,7 @@ class JobStatus(str, Enum):
class Dependency: class Dependency:
def __init__(self, jobs, allow_failure: bool = False): def __init__(self, jobs, allow_failure: bool = False, enqueue_at_front: bool = False):
jobs = ensure_list(jobs) jobs = ensure_list(jobs)
if not all( if not all(
isinstance(job, Job) or isinstance(job, str) isinstance(job, Job) or isinstance(job, str)
@ -52,6 +52,7 @@ class Dependency:
self.dependencies = jobs self.dependencies = jobs
self.allow_failure = allow_failure self.allow_failure = allow_failure
self.enqueue_at_front = enqueue_at_front
# Sentinel value to mark that some of our lazily evaluated properties have not # Sentinel value to mark that some of our lazily evaluated properties have not
@ -151,6 +152,7 @@ class Job:
# dependency could be job instance or id, or iterable thereof # dependency could be job instance or id, or iterable thereof
if depends_on is not None: if depends_on is not None:
if isinstance(depends_on, Dependency): if isinstance(depends_on, Dependency):
job.enqueue_at_front = depends_on.enqueue_at_front
job.allow_dependency_failures = depends_on.allow_failure job.allow_dependency_failures = depends_on.allow_failure
depends_on_list = depends_on.dependencies depends_on_list = depends_on.dependencies
else: else:
@ -429,6 +431,7 @@ class Job:
self.redis_server_version = None self.redis_server_version = None
self.last_heartbeat = None self.last_heartbeat = None
self.allow_dependency_failures = None self.allow_dependency_failures = None
self.enqueue_at_front = None
def __repr__(self): # noqa # pragma: no cover def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
@ -586,6 +589,7 @@ class Job:
self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids
else [dep_id.decode()] if dep_id else []) else [dep_id.decode()] if dep_id else [])
self.allow_dependency_failures = bool(int(obj.get('allow_dependency_failures'))) if obj.get('allow_dependency_failures') else None self.allow_dependency_failures = bool(int(obj.get('allow_dependency_failures'))) if obj.get('allow_dependency_failures') else None
self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
@ -669,6 +673,9 @@ class Job:
# convert boolean to integer to avoid redis.exception.DataError # convert boolean to integer to avoid redis.exception.DataError
obj["allow_dependency_failures"] = int(self.allow_dependency_failures) obj["allow_dependency_failures"] = int(self.allow_dependency_failures)
if self.enqueue_at_front is not None:
obj["enqueue_at_front"] = int(self.enqueue_at_front)
return obj return obj
def save(self, pipeline=None, include_meta=True): def save(self, pipeline=None, include_meta=True):

@ -325,9 +325,6 @@ class Queue:
job.retries_left = retry.max job.retries_left = retry.max
job.retry_intervals = retry.intervals job.retry_intervals = retry.intervals
if isinstance(depends_on, Dependency):
job.allow_dependency_failures = depends_on.allow_failure
return job return job
def setup_dependencies( def setup_dependencies(
@ -648,16 +645,19 @@ class Queue:
break break
for dependent in jobs_to_enqueue: for dependent in jobs_to_enqueue:
enqueue_at_front = dependent.enqueue_at_front or False
registry = DeferredJobRegistry(dependent.origin, registry = DeferredJobRegistry(dependent.origin,
self.connection, self.connection,
job_class=self.job_class, job_class=self.job_class,
serializer=self.serializer) serializer=self.serializer)
registry.remove(dependent, pipeline=pipe) registry.remove(dependent, pipeline=pipe)
if dependent.origin == self.name: if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipe) self.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
else: else:
queue = self.__class__(name=dependent.origin, connection=self.connection) queue = self.__class__(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipe) queue.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
# Only delete dependents_key if all dependents have been enqueued # Only delete dependents_key if all dependents have been enqueued
if len(jobs_to_enqueue) == len(dependent_job_ids): if len(jobs_to_enqueue) == len(dependent_job_ids):

@ -98,6 +98,25 @@ class TestDependencies(RQTestCase):
job = Job.fetch(job.id, connection=self.testconn) job = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.get_status(), JobStatus.FINISHED) self.assertEqual(job.get_status(), JobStatus.FINISHED)
# Test dependant is enqueued at front
q.empty()
parent_job = q.enqueue(say_hello)
q.enqueue(
say_hello,
job_id='fake_job_id_1',
depends_on=Dependency(jobs=[parent_job])
)
q.enqueue(
say_hello,
job_id='fake_job_id_2',
depends_on=Dependency(jobs=[parent_job],enqueue_at_front=True)
)
#q.enqueue(say_hello) # This is a filler job that will act as a separator for jobs, one will be enqueued at front while the other one at the end of the queue
w.work(burst=True, max_jobs=1)
self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"])
def test_dependencies_are_met_if_parent_is_canceled(self): def test_dependencies_are_met_if_parent_is_canceled(self):
"""When parent job is canceled, it should be treated as failed""" """When parent job is canceled, it should be treated as failed"""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)

Loading…
Cancel
Save