Update Job#dependencies_are_met ...

... such that it fetch all dependency status using SMEMBERS and HGET rather than SORT.
main
thomas 5 years ago
parent 0672cd00c6
commit 0b528dae4b

@ -4,12 +4,12 @@ from __future__ import (absolute_import, division, print_function,
import inspect
import warnings
import zlib
from functools import partial
from uuid import uuid4
from rq.compat import as_text, decode_redis_hash, string_types, text_type
import zlib
from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection
from .exceptions import NoSuchJobError
from .local import LocalStack
@ -752,35 +752,28 @@ class Job(object):
method is _called_ in the _stack_ of it's dependents are being enqueued.
"""
pipe = pipeline if pipeline is not None else self.connection
connection = pipeline if pipeline is not None else self.connection
if pipeline is not None:
pipe.watch(*self.dependency_ids)
connection.watch(*self.dependency_ids)
sort_by = self.redis_job_namespace_prefix + '*->ended_at'
get_fields = (
'#',
self.redis_job_namespace_prefix + '*->created_at',
self.redis_job_namespace_prefix + '*->status'
)
dependencies_ids = {_id.decode()
for _id in connection.smembers(self.dependencies_key)}
if exclude_job_id:
dependencies_ids.discard(exclude_job_id)
# As a minor optimization to more quickly tell if all dependencies
# are _FINISHED_, sort dependencies by the `ended_at` timestamp so
# those jobs which are not yet finished are at the start of the
# list. Sorting here lexographically works because these dates are
# stored in an ISO 8601 format, so lexographic order is the same as
# chronological order.
dependencies_statuses = [
tuple(map(as_text, result))
for result in pipe.sort(name=self.dependencies_key, by=sort_by,
get=get_fields, alpha=True,
groups=True, )
connection.hget(self.key_for(key), 'status')
for key
in dependencies_ids
]
# if `created_at` is None, then this has been deleted!
return all(status == JobStatus.FINISHED or not created_at
for dependency_id, created_at, status
return all(
status.decode() == JobStatus.FINISHED
for status
in dependencies_statuses
if not (exclude_job_id and dependency_id == exclude_job_id))
if status
)
_job_stack = LocalStack()

@ -282,7 +282,7 @@ class Queue(object):
at_front=False, meta=None):
"""Creates a job to represent the delayed function call and enqueues
it.
nd
It is much like `.enqueue()`, except that it takes the function's args
and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself.

@ -910,5 +910,5 @@ class TestJob(RQTestCase):
w = Worker([queue])
w.work(burst=True, max_jobs=1)
assert dependent_job.get_status() == JobStatus.QUEUED
assert dependent_job.dependencies_are_met()
assert dependent_job.get_status() == JobStatus.QUEUED

Loading…
Cancel
Save