Alway set status 'FINISHED' when job is Successful

Method Queue#enqueue_dependents checks the status of all dependencies of all dependents, and enqueues those dependents for which all dependencies are FINISHED.

The enqueue_dependents method WAS called from Worker#handle_job_success called BEFORE the status of the successful job was set in Redis, so enqueue_dependents explicitly excluded the _successful_ job from interrogation of dependency statuses as the it would never be true in the existing code path, but it was assumed that this would be final status after the current pipeline was executed.

This commit changes Worker#handle_job_success so that it persists the status of the successful job to Redis, everytime a job completes(not only if it has a ttl) and does so before enqueue_dependents is called. This allows for enqueue_dependents to be less reliant on the out of band state of the current _successful job being handled_.
main
thomas committed by Thomas Matecki
parent 4a64244f40
commit 7ea5a32a55

@ -480,7 +480,6 @@ class Queue(object):
status == JobStatus.FINISHED status == JobStatus.FINISHED
for job_id, status for job_id, status
in dependents_dependencies in dependents_dependencies
if job_id != job.id
): ):
continue continue

@ -15,20 +15,15 @@ import warnings
from datetime import timedelta from datetime import timedelta
from uuid import uuid4 from uuid import uuid4
try:
from signal import SIGKILL
except ImportError:
from signal import SIGTERM as SIGKILL
from redis import WatchError from redis import WatchError
from . import worker_registration from . import worker_registration
from .compat import PY2, as_text, string_types, text_type from .compat import PY2, as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection from .connections import (get_current_connection, pop_connection,
push_connection)
from .defaults import (DEFAULT_RESULT_TTL, from .defaults import (DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL)
from .exceptions import DequeueTimeout, ShutDownImminentException from .exceptions import DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus from .job import Job, JobStatus
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
@ -36,13 +31,22 @@ from .queue import Queue
from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries
from .scheduler import RQScheduler from .scheduler import RQScheduler
from .suspension import is_suspended from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .timeouts import (HorseMonitorTimeoutException, JobTimeoutException,
from .utils import (backend_class, ensure_list, enum, UnixSignalDeathPenalty)
make_colorizer, utcformat, utcnow, utcparse) from .utils import (backend_class, ensure_list, enum, make_colorizer,
utcformat, utcnow, utcparse)
from .version import VERSION from .version import VERSION
from .worker_registration import clean_worker_registry, get_keys from .worker_registration import clean_worker_registry, get_keys
from .serializers import resolve_serializer from .serializers import resolve_serializer
try:
from signal import SIGKILL
except ImportError:
from signal import SIGTERM as SIGKILL
try: try:
from setproctitle import setproctitle as setprocname from setproctitle import setproctitle as setprocname
except ImportError: except ImportError:
@ -845,6 +849,7 @@ class Worker(object):
# if dependencies are inserted after enqueue_dependents # if dependencies are inserted after enqueue_dependents
# a WatchError is thrown by execute() # a WatchError is thrown by execute()
pipeline.watch(job.dependents_key) pipeline.watch(job.dependents_key)
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
# enqueue_dependents calls multi() on the pipeline! # enqueue_dependents calls multi() on the pipeline!
queue.enqueue_dependents(job, pipeline=pipeline) queue.enqueue_dependents(job, pipeline=pipeline)
@ -856,7 +861,6 @@ class Worker(object):
result_ttl = job.get_result_ttl(self.default_result_ttl) result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0: if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
# Don't clobber the user's meta dictionary! # Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False) job.save(pipeline=pipeline, include_meta=False)

@ -411,6 +411,9 @@ class TestQueue(RQTestCase):
job_2 = q.enqueue(say_hello, depends_on=parent_job) job_2 = q.enqueue(say_hello, depends_on=parent_job)
registry = DeferredJobRegistry(q.name, connection=self.testconn) registry = DeferredJobRegistry(q.name, connection=self.testconn)
parent_job.set_status(JobStatus.FINISHED)
self.assertEqual( self.assertEqual(
set(registry.get_job_ids()), set(registry.get_job_ids()),
set([job_1.id, job_2.id]) set([job_1.id, job_2.id])
@ -441,6 +444,9 @@ class TestQueue(RQTestCase):
set([job_1.id]) set([job_1.id])
) )
registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn)
parent_job.set_status(JobStatus.FINISHED)
self.assertEqual( self.assertEqual(
set(registry_2.get_job_ids()), set(registry_2.get_job_ids()),
set([job_2.id]) set([job_2.id])
@ -573,26 +579,33 @@ class TestQueue(RQTestCase):
def test_enqueues_dependent_if_other_dependencies_finished(self): def test_enqueues_dependent_if_other_dependencies_finished(self):
started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED) parent_jobs = [Job.create(func=say_hello) for _ in
started_dependency.save() range(2)]
parent_jobs[0]._status = JobStatus.STARTED
parent_jobs[0].save()
finished_dependency = Job.create(func=say_hello, status=JobStatus.FINISHED) parent_jobs[1]._status = JobStatus.FINISHED
finished_dependency.save() parent_jobs[1].save()
job_create = Job.create job_create = Job.create
def create_job_patch(*args, **kwargs): def create_job_patch(*args, **kwargs):
# patch Job#create to set parent jobs as dependencies. # patch Job#create to set parent jobs as dependencies.
job = job_create(*args, **kwargs) job = job_create(*args, **kwargs)
job._dependency_ids = [job.id for job in [started_dependency, finished_dependency]] job._dependency_ids = [job.id for job in parent_jobs]
return job return job
q = Queue() q = Queue()
with patch.object(Job, 'create', create_job_patch) as patch_create_job: with patch.object(Job, 'create', create_job_patch) as patch_create_job:
dependent_job = q.enqueue(say_hello, depends_on=[started_dependency]) # dependent job deferred, b/c parent_job 0 is still 'started'
dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0])
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
q.enqueue_dependents(started_dependency) # now set parent job 0 to 'finished'
parent_jobs[0].set_status(JobStatus.FINISHED)
q.enqueue_dependents(parent_jobs[0])
self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED) self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED)
self.assertEqual(q.job_ids, [dependent_job.id]) self.assertEqual(q.job_ids, [dependent_job.id])

Loading…
Cancel
Save