From 7ea5a32a55c59b05da849b3aa3d9e20a135df45a Mon Sep 17 00:00:00 2001 From: thomas Date: Sat, 14 Dec 2019 23:06:22 -0500 Subject: [PATCH] Alway set status 'FINISHED' when job is Successful Method Queue#enqueue_dependents checks the status of all dependencies of all dependents, and enqueues those dependents for which all dependencies are FINISHED. The enqueue_dependents method WAS called from Worker#handle_job_success called BEFORE the status of the successful job was set in Redis, so enqueue_dependents explicitly excluded the _successful_ job from interrogation of dependency statuses as the it would never be true in the existing code path, but it was assumed that this would be final status after the current pipeline was executed. This commit changes Worker#handle_job_success so that it persists the status of the successful job to Redis, everytime a job completes(not only if it has a ttl) and does so before enqueue_dependents is called. This allows for enqueue_dependents to be less reliant on the out of band state of the current _successful job being handled_. --- rq/queue.py | 1 - rq/worker.py | 32 ++++++++++++++++++-------------- tests/test_queue.py | 27 ++++++++++++++++++++------- 3 files changed, 38 insertions(+), 22 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index a354d1c..57aa3a6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -480,7 +480,6 @@ class Queue(object): status == JobStatus.FINISHED for job_id, status in dependents_dependencies - if job_id != job.id ): continue diff --git a/rq/worker.py b/rq/worker.py index 7e090f5..553b457 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -15,20 +15,15 @@ import warnings from datetime import timedelta from uuid import uuid4 -try: - from signal import SIGKILL -except ImportError: - from signal import SIGTERM as SIGKILL - from redis import WatchError from . import worker_registration from .compat import PY2, as_text, string_types, text_type -from .connections import get_current_connection, push_connection, pop_connection - -from .defaults import (DEFAULT_RESULT_TTL, - DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) +from .connections import (get_current_connection, pop_connection, + push_connection) +from .defaults import (DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, + DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL) from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers @@ -36,13 +31,22 @@ from .queue import Queue from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .suspension import is_suspended -from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import (backend_class, ensure_list, enum, - make_colorizer, utcformat, utcnow, utcparse) +from .timeouts import (HorseMonitorTimeoutException, JobTimeoutException, + UnixSignalDeathPenalty) +from .utils import (backend_class, ensure_list, enum, make_colorizer, + utcformat, utcnow, utcparse) from .version import VERSION from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer +try: + from signal import SIGKILL +except ImportError: + from signal import SIGTERM as SIGKILL + + + + try: from setproctitle import setproctitle as setprocname except ImportError: @@ -845,6 +849,7 @@ class Worker(object): # if dependencies are inserted after enqueue_dependents # a WatchError is thrown by execute() pipeline.watch(job.dependents_key) + job.set_status(JobStatus.FINISHED, pipeline=pipeline) # enqueue_dependents calls multi() on the pipeline! queue.enqueue_dependents(job, pipeline=pipeline) @@ -856,7 +861,6 @@ class Worker(object): result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: - job.set_status(JobStatus.FINISHED, pipeline=pipeline) # Don't clobber the user's meta dictionary! job.save(pipeline=pipeline, include_meta=False) diff --git a/tests/test_queue.py b/tests/test_queue.py index 4b43677..f776747 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -411,6 +411,9 @@ class TestQueue(RQTestCase): job_2 = q.enqueue(say_hello, depends_on=parent_job) registry = DeferredJobRegistry(q.name, connection=self.testconn) + + parent_job.set_status(JobStatus.FINISHED) + self.assertEqual( set(registry.get_job_ids()), set([job_1.id, job_2.id]) @@ -441,6 +444,9 @@ class TestQueue(RQTestCase): set([job_1.id]) ) registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) + + parent_job.set_status(JobStatus.FINISHED) + self.assertEqual( set(registry_2.get_job_ids()), set([job_2.id]) @@ -573,26 +579,33 @@ class TestQueue(RQTestCase): def test_enqueues_dependent_if_other_dependencies_finished(self): - started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED) - started_dependency.save() + parent_jobs = [Job.create(func=say_hello) for _ in + range(2)] + + parent_jobs[0]._status = JobStatus.STARTED + parent_jobs[0].save() - finished_dependency = Job.create(func=say_hello, status=JobStatus.FINISHED) - finished_dependency.save() + parent_jobs[1]._status = JobStatus.FINISHED + parent_jobs[1].save() job_create = Job.create def create_job_patch(*args, **kwargs): # patch Job#create to set parent jobs as dependencies. job = job_create(*args, **kwargs) - job._dependency_ids = [job.id for job in [started_dependency, finished_dependency]] + job._dependency_ids = [job.id for job in parent_jobs] return job q = Queue() with patch.object(Job, 'create', create_job_patch) as patch_create_job: - dependent_job = q.enqueue(say_hello, depends_on=[started_dependency]) + # dependent job deferred, b/c parent_job 0 is still 'started' + dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0]) self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) - q.enqueue_dependents(started_dependency) + # now set parent job 0 to 'finished' + parent_jobs[0].set_status(JobStatus.FINISHED) + + q.enqueue_dependents(parent_jobs[0]) self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED) self.assertEqual(q.job_ids, [dependent_job.id])