diff --git a/tests/test_worker.py b/tests/test_worker.py index b753274..f09a226 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -11,6 +11,8 @@ import time from multiprocessing import Process import subprocess +import mock + from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, @@ -567,6 +569,40 @@ class TestWorker(RQTestCase): worker.work(burst=True) self.assertEqual(self.testconn.zcard(registry.key), 0) + def test_job_dependency_race_condition(self): + """Dependencies added while the job gets finished shouldn't get lost.""" + + # This patches the enqueue_dependents to enqueue a new dependency AFTER + # the original code was executed. + orig_enqueue_dependents = Queue.enqueue_dependents + + def new_enqueue_dependents(self, job, *args, **kwargs): + orig_enqueue_dependents(self, job, *args, **kwargs) + if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue.id == job.id: + Queue._add_enqueue = None + Queue().enqueue_call(say_hello, depends_on=job) + + Queue.enqueue_dependents = new_enqueue_dependents + + q = Queue() + w = Worker([q]) + with mock.patch.object(Worker, 'execute_job', wraps=w.execute_job) as mocked: + parent_job = q.enqueue(say_hello, result_ttl=0) + Queue._add_enqueue = parent_job + job = q.enqueue_call(say_hello, depends_on=parent_job) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + # The created spy checks two issues: + # * before the fix of #739, 2 of the 3 jobs where executed due + # to the race condition + # * during the development another issue was fixed: + # due to a missing pipeline usage in Queue.enqueue_job, the job + # which was enqueued before the "rollback" was executed twice. + # So before that fix the call count was 4 instead of 3 + self.assertEqual(mocked.call_count, 3) + def kill_worker(pid, double_kill): # wait for the worker to be started over on the main process