diff --git a/rq/queue.py b/rq/queue.py index bf2017c..e5d3df8 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -288,7 +288,7 @@ class Queue(object): return job - def enqueue_dependents(self, job): + def enqueue_dependents(self, job, pipeline=None): """Enqueues all jobs in the given job's dependents set and clears it.""" # TODO: can probably be pipelined from .registry import DeferredJobRegistry diff --git a/rq/worker.py b/rq/worker.py index 5d4e74f..60683d0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -447,12 +447,9 @@ class Worker(object): break job, queue = result - self.execute_job(job) + self.execute_job(job, queue) self.heartbeat() - if job.get_status() == JobStatus.FINISHED: - queue.enqueue_dependents(job) - did_perform_work = True finally: @@ -504,7 +501,7 @@ class Worker(object): self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within {0} seconds.'.format(timeout)) - def execute_job(self, job): + def execute_job(self, job, queue): """Spawns a work horse to perform the actual work and passes it a job. The worker will wait for the work horse and make sure it executes within the given timeout bounds, or will end the work horse with @@ -515,7 +512,7 @@ class Worker(object): os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id if child_pid == 0: - self.main_work_horse(job) + self.main_work_horse(job, queue) else: self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time())) @@ -534,7 +531,7 @@ class Worker(object): if e.errno != errno.EINTR: raise - def main_work_horse(self, job): + def main_work_horse(self, job, queue): """This is the entry point of the newly spawned work horse.""" # After fork()'ing, always assure we are generating random sequences # that are different from the worker. @@ -551,7 +548,7 @@ class Worker(object): self._is_horse = True self.log = logger - success = self.perform_job(job) + success = self.perform_job(job, queue) # os._exit() is the way to exit from childs after a fork(), in # constrast to the regular sys.exit() @@ -577,7 +574,7 @@ class Worker(object): msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) - def perform_job(self, job): + def perform_job(self, job, queue): """Performs the actual work of a job. Will/should only be called inside the work horse's process. """ @@ -602,10 +599,13 @@ class Worker(object): job.set_status(JobStatus.FINISHED, pipeline=pipeline) job.save(pipeline=pipeline) - finished_job_registry = FinishedJobRegistry(job.origin, self.connection) + finished_job_registry = FinishedJobRegistry(job.origin, + self.connection) finished_job_registry.add(job, result_ttl, pipeline) - job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) + queue.enqueue_dependents(job, pipeline=pipeline) + job.cleanup(result_ttl, pipeline=pipeline, + remove_from_queue=False) started_job_registry.remove(job, pipeline=pipeline) pipeline.execute() diff --git a/tests/test_registry.py b/tests/test_registry.py index ba10258..95bb4ae 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -87,7 +87,7 @@ class TestRegistry(RQTestCase): worker.prepare_job_execution(job) self.assertIn(job.id, registry.get_job_ids()) - worker.perform_job(job) + worker.perform_job(job, queue) self.assertNotIn(job.id, registry.get_job_ids()) # Job that fails @@ -96,7 +96,7 @@ class TestRegistry(RQTestCase): worker.prepare_job_execution(job) self.assertIn(job.id, registry.get_job_ids()) - worker.perform_job(job) + worker.perform_job(job, queue) self.assertNotIn(job.id, registry.get_job_ids()) def test_get_job_count(self): @@ -150,12 +150,12 @@ class TestFinishedJobRegistry(RQTestCase): # Completed jobs are put in FinishedJobRegistry job = queue.enqueue(say_hello) - worker.perform_job(job) + worker.perform_job(job, queue) self.assertEqual(self.registry.get_job_ids(), [job.id]) # Failed jobs are not put in FinishedJobRegistry failed_job = queue.enqueue(div_by_zero) - worker.perform_job(failed_job) + worker.perform_job(failed_job, queue) self.assertEqual(self.registry.get_job_ids(), [job.id]) diff --git a/tests/test_worker.py b/tests/test_worker.py index f135ee5..2762459 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -325,7 +325,7 @@ class TestWorker(RQTestCase): """Enqueue dependent jobs only if their parents don't fail""" q = Queue() w = Worker([q]) - parent_job = q.enqueue(say_hello) + parent_job = q.enqueue(say_hello, result_ttl=0) job = q.enqueue_call(say_hello, depends_on=parent_job) w.work(burst=True) job = Job.fetch(job.id)