Merge branch 'dependent-jobs-with-result-ttl'

main
Selwin Ong 9 years ago
commit 766bb60006

@ -288,7 +288,7 @@ class Queue(object):
return job return job
def enqueue_dependents(self, job): def enqueue_dependents(self, job, pipeline=None):
"""Enqueues all jobs in the given job's dependents set and clears it.""" """Enqueues all jobs in the given job's dependents set and clears it."""
# TODO: can probably be pipelined # TODO: can probably be pipelined
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry

@ -447,12 +447,9 @@ class Worker(object):
break break
job, queue = result job, queue = result
self.execute_job(job) self.execute_job(job, queue)
self.heartbeat() self.heartbeat()
if job.get_status() == JobStatus.FINISHED:
queue.enqueue_dependents(job)
did_perform_work = True did_perform_work = True
finally: finally:
@ -504,7 +501,7 @@ class Worker(object):
self.log.debug('Sent heartbeat to prevent worker timeout. ' self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout)) 'Next one should arrive within {0} seconds.'.format(timeout))
def execute_job(self, job): def execute_job(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job. """Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with within the given timeout bounds, or will end the work horse with
@ -515,7 +512,7 @@ class Worker(object):
os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0: if child_pid == 0:
self.main_work_horse(job) self.main_work_horse(job, queue)
else: else:
self._horse_pid = child_pid self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time())) self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
@ -534,7 +531,7 @@ class Worker(object):
if e.errno != errno.EINTR: if e.errno != errno.EINTR:
raise raise
def main_work_horse(self, job): def main_work_horse(self, job, queue):
"""This is the entry point of the newly spawned work horse.""" """This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences # After fork()'ing, always assure we are generating random sequences
# that are different from the worker. # that are different from the worker.
@ -551,7 +548,7 @@ class Worker(object):
self._is_horse = True self._is_horse = True
self.log = logger self.log = logger
success = self.perform_job(job) success = self.perform_job(job, queue)
# os._exit() is the way to exit from childs after a fork(), in # os._exit() is the way to exit from childs after a fork(), in
# constrast to the regular sys.exit() # constrast to the regular sys.exit()
@ -577,7 +574,7 @@ class Worker(object):
msg = 'Processing {0} from {1} since {2}' msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time())) self.procline(msg.format(job.func_name, job.origin, time.time()))
def perform_job(self, job): def perform_job(self, job, queue):
"""Performs the actual work of a job. Will/should only be called """Performs the actual work of a job. Will/should only be called
inside the work horse's process. inside the work horse's process.
""" """
@ -602,10 +599,13 @@ class Worker(object):
job.set_status(JobStatus.FINISHED, pipeline=pipeline) job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline) job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin, self.connection) finished_job_registry = FinishedJobRegistry(job.origin,
self.connection)
finished_job_registry.add(job, result_ttl, pipeline) finished_job_registry.add(job, result_ttl, pipeline)
job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) queue.enqueue_dependents(job, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline,
remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute() pipeline.execute()

@ -87,7 +87,7 @@ class TestRegistry(RQTestCase):
worker.prepare_job_execution(job) worker.prepare_job_execution(job)
self.assertIn(job.id, registry.get_job_ids()) self.assertIn(job.id, registry.get_job_ids())
worker.perform_job(job) worker.perform_job(job, queue)
self.assertNotIn(job.id, registry.get_job_ids()) self.assertNotIn(job.id, registry.get_job_ids())
# Job that fails # Job that fails
@ -96,7 +96,7 @@ class TestRegistry(RQTestCase):
worker.prepare_job_execution(job) worker.prepare_job_execution(job)
self.assertIn(job.id, registry.get_job_ids()) self.assertIn(job.id, registry.get_job_ids())
worker.perform_job(job) worker.perform_job(job, queue)
self.assertNotIn(job.id, registry.get_job_ids()) self.assertNotIn(job.id, registry.get_job_ids())
def test_get_job_count(self): def test_get_job_count(self):
@ -150,12 +150,12 @@ class TestFinishedJobRegistry(RQTestCase):
# Completed jobs are put in FinishedJobRegistry # Completed jobs are put in FinishedJobRegistry
job = queue.enqueue(say_hello) job = queue.enqueue(say_hello)
worker.perform_job(job) worker.perform_job(job, queue)
self.assertEqual(self.registry.get_job_ids(), [job.id]) self.assertEqual(self.registry.get_job_ids(), [job.id])
# Failed jobs are not put in FinishedJobRegistry # Failed jobs are not put in FinishedJobRegistry
failed_job = queue.enqueue(div_by_zero) failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job) worker.perform_job(failed_job, queue)
self.assertEqual(self.registry.get_job_ids(), [job.id]) self.assertEqual(self.registry.get_job_ids(), [job.id])

@ -325,7 +325,7 @@ class TestWorker(RQTestCase):
"""Enqueue dependent jobs only if their parents don't fail""" """Enqueue dependent jobs only if their parents don't fail"""
q = Queue() q = Queue()
w = Worker([q]) w = Worker([q])
parent_job = q.enqueue(say_hello) parent_job = q.enqueue(say_hello, result_ttl=0)
job = q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id) job = Job.fetch(job.id)

Loading…
Cancel
Save