Dependent jobs are now correctly enqueued even if their parent jobs have result_ttl=0.

main
Selwin Ong 9 years ago
parent 1116089b43
commit e9d227c3df

@ -288,7 +288,7 @@ class Queue(object):
return job
def enqueue_dependents(self, job):
def enqueue_dependents(self, job, pipeline=None):
"""Enqueues all jobs in the given job's dependents set and clears it."""
# TODO: can probably be pipelined
from .registry import DeferredJobRegistry

@ -447,12 +447,9 @@ class Worker(object):
break
job, queue = result
self.execute_job(job)
self.execute_job(job, queue)
self.heartbeat()
if job.get_status() == JobStatus.FINISHED:
queue.enqueue_dependents(job)
did_perform_work = True
finally:
@ -504,7 +501,7 @@ class Worker(object):
self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout))
def execute_job(self, job):
def execute_job(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
@ -515,7 +512,7 @@ class Worker(object):
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0:
self.main_work_horse(job)
self.main_work_horse(job, queue)
else:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
@ -534,7 +531,7 @@ class Worker(object):
if e.errno != errno.EINTR:
raise
def main_work_horse(self, job):
def main_work_horse(self, job, queue):
"""This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences
# that are different from the worker.
@ -551,7 +548,7 @@ class Worker(object):
self._is_horse = True
self.log = logger
success = self.perform_job(job)
success = self.perform_job(job, queue)
# os._exit() is the way to exit from childs after a fork(), in
# constrast to the regular sys.exit()
@ -577,7 +574,7 @@ class Worker(object):
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
def perform_job(self, job):
def perform_job(self, job, queue):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
@ -602,9 +599,11 @@ class Worker(object):
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin, self.connection)
finished_job_registry = FinishedJobRegistry(job.origin,
self.connection)
finished_job_registry.add(job, result_ttl, pipeline)
queue.enqueue_dependents(job, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)

@ -87,7 +87,7 @@ class TestRegistry(RQTestCase):
worker.prepare_job_execution(job)
self.assertIn(job.id, registry.get_job_ids())
worker.perform_job(job)
worker.perform_job(job, queue)
self.assertNotIn(job.id, registry.get_job_ids())
# Job that fails
@ -96,7 +96,7 @@ class TestRegistry(RQTestCase):
worker.prepare_job_execution(job)
self.assertIn(job.id, registry.get_job_ids())
worker.perform_job(job)
worker.perform_job(job, queue)
self.assertNotIn(job.id, registry.get_job_ids())
def test_get_job_count(self):
@ -150,12 +150,12 @@ class TestFinishedJobRegistry(RQTestCase):
# Completed jobs are put in FinishedJobRegistry
job = queue.enqueue(say_hello)
worker.perform_job(job)
worker.perform_job(job, queue)
self.assertEqual(self.registry.get_job_ids(), [job.id])
# Failed jobs are not put in FinishedJobRegistry
failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job)
worker.perform_job(failed_job, queue)
self.assertEqual(self.registry.get_job_ids(), [job.id])

@ -319,7 +319,7 @@ class TestWorker(RQTestCase):
"""Enqueue dependent jobs only if their parents don't fail"""
q = Queue()
w = Worker([q])
parent_job = q.enqueue(say_hello)
parent_job = q.enqueue(say_hello, result_ttl=0)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)

Loading…
Cancel
Save