|
|
|
@ -24,6 +24,20 @@ class CustomJob(Job):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MultipleDependencyJob(Job):
|
|
|
|
|
"""
|
|
|
|
|
Allows for the patching of `_dependency_ids` to simulate multi-dependency
|
|
|
|
|
support without modifying the public interface of `Job`
|
|
|
|
|
"""
|
|
|
|
|
create_job = Job.create
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def create(cls, *args, **kwargs):
|
|
|
|
|
dependency_ids = kwargs.pop('kwargs').pop('_dependency_ids')
|
|
|
|
|
_job = cls.create_job(*args, **kwargs)
|
|
|
|
|
_job._dependency_ids = dependency_ids
|
|
|
|
|
return _job
|
|
|
|
|
|
|
|
|
|
class TestQueue(RQTestCase):
|
|
|
|
|
def test_create_queue(self):
|
|
|
|
|
"""Creating queues."""
|
|
|
|
@ -539,17 +553,10 @@ class TestQueue(RQTestCase):
|
|
|
|
|
job._status = JobStatus.QUEUED
|
|
|
|
|
job.save()
|
|
|
|
|
|
|
|
|
|
job_create = Job.create
|
|
|
|
|
|
|
|
|
|
def create_job_patch(*args, **kwargs):
|
|
|
|
|
# patch Job#create to set parent jobs as dependencies.
|
|
|
|
|
job = job_create(*args, **kwargs)
|
|
|
|
|
job._dependency_ids = [job.id for job in parent_jobs]
|
|
|
|
|
return job
|
|
|
|
|
|
|
|
|
|
q = Queue()
|
|
|
|
|
with patch.object(Job, 'create', create_job_patch) as patch_create_job:
|
|
|
|
|
job = q.enqueue(say_hello, depends_on=parent_jobs[0])
|
|
|
|
|
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
|
|
|
|
|
job = q.enqueue(say_hello, depends_on=parent_jobs[0],
|
|
|
|
|
_dependency_ids = [job.id for job in parent_jobs])
|
|
|
|
|
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
|
|
|
|
|
self.assertEqual(q.job_ids, [])
|
|
|
|
|
self.assertEqual(job.fetch_dependencies(), parent_jobs)
|
|
|
|
@ -562,17 +569,10 @@ class TestQueue(RQTestCase):
|
|
|
|
|
job._status = JobStatus.FINISHED
|
|
|
|
|
job.save()
|
|
|
|
|
|
|
|
|
|
job_create = Job.create
|
|
|
|
|
|
|
|
|
|
def create_job_patch(*args, **kwargs):
|
|
|
|
|
# patch Job#create to set parent jobs as dependencies.
|
|
|
|
|
job = job_create(*args, **kwargs)
|
|
|
|
|
job._dependency_ids = [job.id for job in parent_jobs]
|
|
|
|
|
return job
|
|
|
|
|
|
|
|
|
|
q = Queue()
|
|
|
|
|
with patch.object(Job, 'create', create_job_patch) as patch_create_job:
|
|
|
|
|
job = q.enqueue(say_hello, depends_on=parent_jobs[0])
|
|
|
|
|
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
|
|
|
|
|
job = q.enqueue(say_hello, depends_on=parent_jobs[0],
|
|
|
|
|
_dependency_ids=[job.id for job in parent_jobs])
|
|
|
|
|
self.assertEqual(job.get_status(), JobStatus.QUEUED)
|
|
|
|
|
self.assertEqual(q.job_ids, [job.id])
|
|
|
|
|
self.assertEqual(job.fetch_dependencies(), parent_jobs)
|
|
|
|
@ -580,7 +580,7 @@ class TestQueue(RQTestCase):
|
|
|
|
|
def test_enqueues_dependent_if_other_dependencies_finished(self):
|
|
|
|
|
|
|
|
|
|
parent_jobs = [Job.create(func=say_hello) for _ in
|
|
|
|
|
range(2)]
|
|
|
|
|
range(3)]
|
|
|
|
|
|
|
|
|
|
parent_jobs[0]._status = JobStatus.STARTED
|
|
|
|
|
parent_jobs[0].save()
|
|
|
|
@ -588,18 +588,15 @@ class TestQueue(RQTestCase):
|
|
|
|
|
parent_jobs[1]._status = JobStatus.FINISHED
|
|
|
|
|
parent_jobs[1].save()
|
|
|
|
|
|
|
|
|
|
job_create = Job.create
|
|
|
|
|
|
|
|
|
|
def create_job_patch(*args, **kwargs):
|
|
|
|
|
# patch Job#create to set parent jobs as dependencies.
|
|
|
|
|
job = job_create(*args, **kwargs)
|
|
|
|
|
job._dependency_ids = [job.id for job in parent_jobs]
|
|
|
|
|
return job
|
|
|
|
|
parent_jobs[2]._status = JobStatus.FINISHED
|
|
|
|
|
parent_jobs[2].save()
|
|
|
|
|
|
|
|
|
|
q = Queue()
|
|
|
|
|
with patch.object(Job, 'create', create_job_patch) as patch_create_job:
|
|
|
|
|
with patch('rq.queue.Job.create',
|
|
|
|
|
new=MultipleDependencyJob.create):
|
|
|
|
|
# dependent job deferred, b/c parent_job 0 is still 'started'
|
|
|
|
|
dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0])
|
|
|
|
|
dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0],
|
|
|
|
|
_dependency_ids=[job.id for job in parent_jobs])
|
|
|
|
|
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
|
|
|
|
|
|
|
|
|
|
# now set parent job 0 to 'finished'
|
|
|
|
@ -617,17 +614,10 @@ class TestQueue(RQTestCase):
|
|
|
|
|
queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED)
|
|
|
|
|
queued_dependency.save()
|
|
|
|
|
|
|
|
|
|
job_create = Job.create
|
|
|
|
|
|
|
|
|
|
def create_job_patch(*args, **kwargs):
|
|
|
|
|
# patch Job#create to set parent jobs as dependencies.
|
|
|
|
|
job = job_create(*args, **kwargs)
|
|
|
|
|
job._dependency_ids = [job.id for job in [started_dependency, queued_dependency]]
|
|
|
|
|
return job
|
|
|
|
|
|
|
|
|
|
q = Queue()
|
|
|
|
|
with patch.object(Job, 'create', create_job_patch) as patch_create_job:
|
|
|
|
|
dependent_job = q.enqueue(say_hello, depends_on=[started_dependency])
|
|
|
|
|
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
|
|
|
|
|
dependent_job = q.enqueue(say_hello, depends_on=[started_dependency],
|
|
|
|
|
_dependency_ids=[started_dependency.id, queued_dependency.id])
|
|
|
|
|
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
|
|
|
|
|
|
|
|
|
|
q.enqueue_dependents(started_dependency)
|
|
|
|
|