diff --git a/rq/queue.py b/rq/queue.py index a354d1c..57aa3a6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -480,7 +480,6 @@ class Queue(object): status == JobStatus.FINISHED for job_id, status in dependents_dependencies - if job_id != job.id ): continue diff --git a/rq/worker.py b/rq/worker.py index 7e090f5..553b457 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -15,20 +15,15 @@ import warnings from datetime import timedelta from uuid import uuid4 -try: - from signal import SIGKILL -except ImportError: - from signal import SIGTERM as SIGKILL - from redis import WatchError from . import worker_registration from .compat import PY2, as_text, string_types, text_type -from .connections import get_current_connection, push_connection, pop_connection - -from .defaults import (DEFAULT_RESULT_TTL, - DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) +from .connections import (get_current_connection, pop_connection, + push_connection) +from .defaults import (DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, + DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL) from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers @@ -36,13 +31,22 @@ from .queue import Queue from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .suspension import is_suspended -from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import (backend_class, ensure_list, enum, - make_colorizer, utcformat, utcnow, utcparse) +from .timeouts import (HorseMonitorTimeoutException, JobTimeoutException, + UnixSignalDeathPenalty) +from .utils import (backend_class, ensure_list, enum, make_colorizer, + utcformat, utcnow, utcparse) from .version import VERSION from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer +try: + from signal import SIGKILL +except ImportError: + from signal import SIGTERM as SIGKILL + + + + try: from setproctitle import setproctitle as setprocname except ImportError: @@ -845,6 +849,7 @@ class Worker(object): # if dependencies are inserted after enqueue_dependents # a WatchError is thrown by execute() pipeline.watch(job.dependents_key) + job.set_status(JobStatus.FINISHED, pipeline=pipeline) # enqueue_dependents calls multi() on the pipeline! queue.enqueue_dependents(job, pipeline=pipeline) @@ -856,7 +861,6 @@ class Worker(object): result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: - job.set_status(JobStatus.FINISHED, pipeline=pipeline) # Don't clobber the user's meta dictionary! job.save(pipeline=pipeline, include_meta=False) diff --git a/tests/test_queue.py b/tests/test_queue.py index 4b43677..f776747 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -411,6 +411,9 @@ class TestQueue(RQTestCase): job_2 = q.enqueue(say_hello, depends_on=parent_job) registry = DeferredJobRegistry(q.name, connection=self.testconn) + + parent_job.set_status(JobStatus.FINISHED) + self.assertEqual( set(registry.get_job_ids()), set([job_1.id, job_2.id]) @@ -441,6 +444,9 @@ class TestQueue(RQTestCase): set([job_1.id]) ) registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) + + parent_job.set_status(JobStatus.FINISHED) + self.assertEqual( set(registry_2.get_job_ids()), set([job_2.id]) @@ -573,26 +579,33 @@ class TestQueue(RQTestCase): def test_enqueues_dependent_if_other_dependencies_finished(self): - started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED) - started_dependency.save() + parent_jobs = [Job.create(func=say_hello) for _ in + range(2)] + + parent_jobs[0]._status = JobStatus.STARTED + parent_jobs[0].save() - finished_dependency = Job.create(func=say_hello, status=JobStatus.FINISHED) - finished_dependency.save() + parent_jobs[1]._status = JobStatus.FINISHED + parent_jobs[1].save() job_create = Job.create def create_job_patch(*args, **kwargs): # patch Job#create to set parent jobs as dependencies. job = job_create(*args, **kwargs) - job._dependency_ids = [job.id for job in [started_dependency, finished_dependency]] + job._dependency_ids = [job.id for job in parent_jobs] return job q = Queue() with patch.object(Job, 'create', create_job_patch) as patch_create_job: - dependent_job = q.enqueue(say_hello, depends_on=[started_dependency]) + # dependent job deferred, b/c parent_job 0 is still 'started' + dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0]) self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) - q.enqueue_dependents(started_dependency) + # now set parent job 0 to 'finished' + parent_jobs[0].set_status(JobStatus.FINISHED) + + q.enqueue_dependents(parent_jobs[0]) self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED) self.assertEqual(q.job_ids, [dependent_job.id])