Split execute job to expose issue 702

In order to create a test for issue 702 we had to split execute_job
to a fork_work_horse function and a monitor_work_horse function.
main
Yannis Spiliopoulos 9 years ago
parent c00d3681f9
commit 93d286a6c7

@ -503,13 +503,9 @@ class Worker(object):
self.log.debug('Sent heartbeat to prevent worker timeout. ' self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout)) 'Next one should arrive within {0} seconds.'.format(timeout))
def execute_job(self, job, queue): def fork_work_horse(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job. """Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
SIGALRM.
""" """
self.set_state('busy')
child_pid = os.fork() child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id os.environ['RQ_JOB_ID'] = job.id
@ -518,10 +514,15 @@ class Worker(object):
else: else:
self._horse_pid = child_pid self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time())) self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
def monitor_work_horse(self, job):
"""The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
SIGALRM.
"""
while True: while True:
try: try:
os.waitpid(child_pid, 0) _, ret_val = os.waitpid(self._horse_pid, 0)
self.set_state('idle')
break break
except OSError as e: except OSError as e:
# In case we encountered an OSError due to EINTR (which is # In case we encountered an OSError due to EINTR (which is
@ -533,6 +534,17 @@ class Worker(object):
if e.errno != errno.EINTR: if e.errno != errno.EINTR:
raise raise
def execute_job(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
SIGALRM.
"""
self.set_state('busy')
self.fork_work_horse(job, queue)
self.monitor_work_horse(job)
self.set_state('idle')
def main_work_horse(self, job, queue): def main_work_horse(self, job, queue):
"""This is the entry point of the newly spawned work horse.""" """This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences # After fork()'ing, always assure we are generating random sequences

Loading…
Cancel
Save