Merge pull request #710 from spiliopoulos/fix_issue_702

Fix issue 702
main
Selwin Ong 9 years ago committed by GitHub
commit 8da79c1aa7

@ -507,13 +507,9 @@ class Worker(object):
self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout))
def execute_job(self, job, queue):
def fork_work_horse(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
SIGALRM.
"""
self.set_state('busy')
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
@ -522,10 +518,44 @@ class Worker(object):
else:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
def monitor_work_horse(self, job):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed
"""
while True:
try:
os.waitpid(child_pid, 0)
self.set_state('idle')
_, ret_val = os.waitpid(self._horse_pid, 0)
if ret_val != os.EX_OK:
job_status = job.get_status()
if job_status is None:
# Job completed and its ttl has expired
break
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
with self.connection._pipeline() as pipeline:
self.handle_job_failure(
job=job,
pipeline=pipeline
)
try:
pipeline.execute()
except Exception:
pass
#Unhandled failure: move the job to the failed queue
self.log.warning(
'Moving job to {0!r} queue'.format(
self.failed_queue.name
)
)
self.failed_queue.quarantine(
job,
exc_info=(
"Work-horse proccess "
"was terminated unexpectedly"
)
)
break
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
@ -537,6 +567,17 @@ class Worker(object):
if e.errno != errno.EINTR:
raise
def execute_job(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
SIGALRM.
"""
self.set_state('busy')
self.fork_work_horse(job, queue)
self.monitor_work_horse(job)
self.set_state('idle')
def main_work_horse(self, job, queue):
"""This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences
@ -584,6 +625,27 @@ class Worker(object):
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
def handle_job_failure(
self,
job,
started_job_registry=None,
pipeline=None
):
"""Handles the failure or an executing job by:
1. Setting the job status to failed
2. Removing the job from the started_job_registry
3. Setting the workers current job to None
"""
if started_job_registry is None:
started_job_registry = StartedJobRegistry(
job.origin,
self.connection
)
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
def perform_job(self, job, queue):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
@ -624,9 +686,11 @@ class Worker(object):
pipeline.execute()
except Exception:
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
self.handle_job_failure(
job=job,
started_job_registry=started_job_registry,
pipeline=pipeline
)
try:
pipeline.execute()
except Exception:

@ -14,7 +14,8 @@ import subprocess
from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, do_nothing, say_hello, say_pid,
run_dummy_heroku_worker, access_self)
run_dummy_heroku_worker, access_self,
long_running_job)
from tests.helpers import strip_microseconds
from rq import (get_failed_queue, Queue, SimpleWorker, Worker,
@ -577,6 +578,10 @@ def kill_worker(pid, double_kill):
time.sleep(0.5)
os.kill(pid, signal.SIGTERM)
def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait)
os.kill(pid, signal.SIGKILL)
class TimeoutTestCase:
def setUp(self):
@ -649,6 +654,30 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertIsNotNone(shutdown_requested_date)
self.assertEqual(type(shutdown_requested_date).__name__, 'datetime')
@slow
def test_work_horse_death_sets_job_failed(self):
"""worker with an ongoing job whose work horse dies unexpectadly (before
completing the job) should set the job's status to FAILED
"""
fooq = Queue('foo')
failed_q = get_failed_queue()
self.assertEqual(failed_q.count, 0)
self.assertEqual(fooq.count, 0)
w = Worker(fooq)
sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
if os.path.exists(sentinel_file):
os.remove(sentinel_file)
fooq.enqueue(create_file_after_timeout, sentinel_file, 100)
job, queue = w.dequeue_job_and_maintain_ttl(5)
w.fork_work_horse(job, queue)
p = Process(target=wait_and_kill_work_horse, args=(w._horse_pid, 0.5))
p.start()
w.monitor_work_horse(job)
job_status = job.get_status()
p.join(1)
self.assertEqual(job_status, JobStatus.FAILED)
self.assertEqual(failed_q.count, 1)
self.assertEqual(fooq.count, 0)
def schedule_access_self():
q = Queue('default', connection=get_current_connection())

Loading…
Cancel
Save