From 0b528dae4b763ee8f1f7af9d3a5fd85b301fa6e6 Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 27 Apr 2020 14:53:23 -0400 Subject: [PATCH] Update Job#dependencies_are_met ... ... such that it fetch all dependency status using SMEMBERS and HGET rather than SORT. --- rq/job.py | 43 ++++++++++++++++++------------------------- rq/queue.py | 2 +- tests/test_job.py | 2 +- 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/rq/job.py b/rq/job.py index 51b76d8..a97e4ef 100644 --- a/rq/job.py +++ b/rq/job.py @@ -4,12 +4,12 @@ from __future__ import (absolute_import, division, print_function, import inspect import warnings -import zlib from functools import partial from uuid import uuid4 -from rq.compat import as_text, decode_redis_hash, string_types, text_type +import zlib +from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack @@ -752,35 +752,28 @@ class Job(object): method is _called_ in the _stack_ of it's dependents are being enqueued. """ - pipe = pipeline if pipeline is not None else self.connection + connection = pipeline if pipeline is not None else self.connection if pipeline is not None: - pipe.watch(*self.dependency_ids) + connection.watch(*self.dependency_ids) - sort_by = self.redis_job_namespace_prefix + '*->ended_at' - get_fields = ( - '#', - self.redis_job_namespace_prefix + '*->created_at', - self.redis_job_namespace_prefix + '*->status' - ) + dependencies_ids = {_id.decode() + for _id in connection.smembers(self.dependencies_key)} + + if exclude_job_id: + dependencies_ids.discard(exclude_job_id) - # As a minor optimization to more quickly tell if all dependencies - # are _FINISHED_, sort dependencies by the `ended_at` timestamp so - # those jobs which are not yet finished are at the start of the - # list. Sorting here lexographically works because these dates are - # stored in an ISO 8601 format, so lexographic order is the same as - # chronological order. dependencies_statuses = [ - tuple(map(as_text, result)) - for result in pipe.sort(name=self.dependencies_key, by=sort_by, - get=get_fields, alpha=True, - groups=True, ) + connection.hget(self.key_for(key), 'status') + for key + in dependencies_ids ] - # if `created_at` is None, then this has been deleted! - return all(status == JobStatus.FINISHED or not created_at - for dependency_id, created_at, status - in dependencies_statuses - if not (exclude_job_id and dependency_id == exclude_job_id)) + return all( + status.decode() == JobStatus.FINISHED + for status + in dependencies_statuses + if status + ) _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 297d5e2..bfaa0fa 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -282,7 +282,7 @@ class Queue(object): at_front=False, meta=None): """Creates a job to represent the delayed function call and enqueues it. - +nd It is much like `.enqueue()`, except that it takes the function's args and kwargs as explicit arguments. Any kwargs passed to this function contain options for RQ itself. diff --git a/tests/test_job.py b/tests/test_job.py index 0a0f49d..670a6a4 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -910,5 +910,5 @@ class TestJob(RQTestCase): w = Worker([queue]) w.work(burst=True, max_jobs=1) - assert dependent_job.get_status() == JobStatus.QUEUED assert dependent_job.dependencies_are_met() + assert dependent_job.get_status() == JobStatus.QUEUED