Added "DEFERRED" Job status for jobs that have unsatisfied dependencies.

main
Selwin Ong 10 years ago
parent dc5cd514ee
commit 7fd2ac8ca6

@ -25,9 +25,14 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads loads = pickle.loads
JobStatus = enum('JobStatus', JobStatus = enum(
QUEUED='queued', FINISHED='finished', FAILED='failed', 'JobStatus',
STARTED='started') QUEUED='queued',
FINISHED='finished',
FAILED='failed',
STARTED='started',
DEFERRED='deferred'
)
# Sentinel value to mark that some of our lazily evaluated properties have not # Sentinel value to mark that some of our lazily evaluated properties have not
# yet been evaluated. # yet been evaluated.

@ -182,7 +182,6 @@ class Queue(object):
""" """
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
# TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection, job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=JobStatus.QUEUED, result_ttl=result_ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on, timeout=timeout, description=description, depends_on=depends_on, timeout=timeout,
@ -200,6 +199,7 @@ class Queue(object):
try: try:
pipe.watch(depends_on.key) pipe.watch(depends_on.key)
if depends_on.get_status() != JobStatus.FINISHED: if depends_on.get_status() != JobStatus.FINISHED:
job.set_status(JobStatus.DEFERRED)
job.register_dependency(pipeline=pipe) job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe) job.save(pipeline=pipe)
pipe.execute() pipe.execute()

@ -342,8 +342,9 @@ class TestQueue(RQTestCase):
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
# Jobs dependent on finished jobs are immediately enqueued # Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED) parent_job.set_status(JobStatus.FINISHED)
@ -351,6 +352,7 @@ class TestQueue(RQTestCase):
job = q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_job_with_dependency_by_id(self): def test_enqueue_job_with_dependency_by_id(self):
"""Enqueueing jobs should work as expected by id as well as job-objects.""" """Enqueueing jobs should work as expected by id as well as job-objects."""
@ -368,7 +370,7 @@ class TestQueue(RQTestCase):
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
def test_enqueue_job_with_dependency_and_timeout(self): def test_enqueue_job_with_dependency_and_timeout(self):
"""Jobs still know their specified timeout after being scheduled as a dependency.""" """Jobs remember their timeout when enqueued as a dependency."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()

Loading…
Cancel
Save