diff --git a/rq/worker.py b/rq/worker.py index f7c8c63..01145cb 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -503,13 +503,9 @@ class Worker(object): self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within {0} seconds.'.format(timeout)) - def execute_job(self, job, queue): + def fork_work_horse(self, job, queue): """Spawns a work horse to perform the actual work and passes it a job. - The worker will wait for the work horse and make sure it executes - within the given timeout bounds, or will end the work horse with - SIGALRM. """ - self.set_state('busy') child_pid = os.fork() os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id @@ -518,20 +514,36 @@ class Worker(object): else: self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time())) - while True: - try: - os.waitpid(child_pid, 0) - self.set_state('idle') - break - except OSError as e: - # In case we encountered an OSError due to EINTR (which is - # caused by a SIGINT or SIGTERM signal during - # os.waitpid()), we simply ignore it and enter the next - # iteration of the loop, waiting for the child to end. In - # any other case, this is some other unexpected OS error, - # which we don't want to catch, so we re-raise those ones. - if e.errno != errno.EINTR: - raise + + def monitor_work_horse(self, job): + """The worker will wait for the work horse and make sure it executes + within the given timeout bounds, or will end the work horse with + SIGALRM. + """ + while True: + try: + _, ret_val = os.waitpid(self._horse_pid, 0) + break + except OSError as e: + # In case we encountered an OSError due to EINTR (which is + # caused by a SIGINT or SIGTERM signal during + # os.waitpid()), we simply ignore it and enter the next + # iteration of the loop, waiting for the child to end. In + # any other case, this is some other unexpected OS error, + # which we don't want to catch, so we re-raise those ones. + if e.errno != errno.EINTR: + raise + + def execute_job(self, job, queue): + """Spawns a work horse to perform the actual work and passes it a job. + The worker will wait for the work horse and make sure it executes + within the given timeout bounds, or will end the work horse with + SIGALRM. + """ + self.set_state('busy') + self.fork_work_horse(job, queue) + self.monitor_work_horse(job) + self.set_state('idle') def main_work_horse(self, job, queue): """This is the entry point of the newly spawned work horse."""