Replaced "after" kwarg with "depends_on".

main
Selwin Ong 11 years ago
parent fa0e9e0f95
commit 93e5e552b7

@ -69,7 +69,7 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None, dependency=None): result_ttl=None, status=None, description=None, depends_on=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -93,8 +93,8 @@ class Job(object):
job.result_ttl = result_ttl job.result_ttl = result_ttl
job._status = status job._status = status
# dependency could be job instance or id # dependency could be job instance or id
if dependency is not None: if depends_on is not None:
job._dependency_id = dependency.id if isinstance(dependency, Job) else dependency job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on
return job return job
@property @property

@ -140,7 +140,7 @@ class Queue(object):
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None, after=None): result_ttl=None, description=None, depends_on=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -153,18 +153,18 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status # TODO: job with dependency shouldn't have "queued" as status
job = Job.create(func, args, kwargs, connection=self.connection, job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, status=Status.QUEUED,
description=description, dependency=after) description=description, depends_on=depends_on)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's waitlist instead of enqueueing it. # parent's waitlist instead of enqueueing it.
# If WatchError is raised in the process, that means something else is # If WatchError is raised in the process, that means something else is
# modifying the dependency. In this case we simply retry # modifying the dependency. In this case we simply retry
if after is not None: if depends_on is not None:
with self.connection.pipeline() as pipe: with self.connection.pipeline() as pipe:
while True: while True:
try: try:
pipe.watch(after.key) pipe.watch(depends_on.key)
if after.status != Status.FINISHED: if depends_on.status != Status.FINISHED:
job.register_dependency() job.register_dependency()
job.save() job.save()
return job return job
@ -197,19 +197,19 @@ class Queue(object):
timeout = None timeout = None
description = None description = None
result_ttl = None result_ttl = None
after = None depends_on = None
if 'args' in kwargs or 'kwargs' in kwargs or 'after' in kwargs: if 'args' in kwargs or 'kwargs' in kwargs or 'depends_on' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
timeout = kwargs.pop('timeout', None) timeout = kwargs.pop('timeout', None)
description = kwargs.pop('description', None) description = kwargs.pop('description', None)
args = kwargs.pop('args', None) args = kwargs.pop('args', None)
result_ttl = kwargs.pop('result_ttl', None) result_ttl = kwargs.pop('result_ttl', None)
after = kwargs.pop('after', None) depends_on = kwargs.pop('depends_on', None)
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs, return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, timeout=timeout, result_ttl=result_ttl,
description=description, after=after) description=description, depends_on=depends_on)
def enqueue_job(self, job, timeout=None, set_meta_data=True): def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.

@ -139,13 +139,13 @@ class TestJob(RQTestCase):
"""Storing jobs with parent job, either instance or key.""" """Storing jobs with parent job, either instance or key."""
parent_job = Job.create(func=some_calculation) parent_job = Job.create(func=some_calculation)
parent_job.save() parent_job.save()
job = Job.create(func=some_calculation, dependency=parent_job) job = Job.create(func=some_calculation, depends_on=parent_job)
job.save() job.save()
stored_job = Job.fetch(job.id) stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id) self.assertEqual(stored_job._dependency_id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job) self.assertEqual(stored_job.dependency, parent_job)
job = Job.create(func=some_calculation, dependency=parent_job.id) job = Job.create(func=some_calculation, depends_on=parent_job.id)
job.save() job.save()
stored_job = Job.fetch(job.id) stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id) self.assertEqual(stored_job._dependency_id, parent_job.id)

@ -294,10 +294,10 @@ class TestQueue(RQTestCase):
q = Queue() q = Queue()
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save() parent_job.save()
job_1 = Job.create(func=say_hello, dependency=parent_job) job_1 = Job.create(func=say_hello, depends_on=parent_job)
job_1.save() job_1.save()
job_1.register_dependency() job_1.register_dependency()
job_2 = Job.create(func=say_hello, dependency=parent_job) job_2 = Job.create(func=say_hello, depends_on=parent_job)
job_2.save() job_2.save()
job_2.register_dependency() job_2.register_dependency()
@ -312,13 +312,13 @@ class TestQueue(RQTestCase):
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()
q.enqueue_call(say_hello, after=parent_job) q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued # Jobs dependent on finished jobs are immediately enqueued
parent_job.status = 'finished' parent_job.status = 'finished'
parent_job.save() parent_job.save()
job = q.enqueue_call(say_hello, after=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])

@ -240,13 +240,13 @@ class TestWorker(RQTestCase):
q = Queue() q = Queue()
w = Worker([q]) w = Worker([q])
parent_job = q.enqueue(say_hello) parent_job = q.enqueue(say_hello)
job = q.enqueue_call(say_hello, after=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEqual(job.status, 'finished') self.assertEqual(job.status, 'finished')
parent_job = q.enqueue(div_by_zero) parent_job = q.enqueue(div_by_zero)
job = q.enqueue_call(say_hello, after=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertNotEqual(job.status, 'finished') self.assertNotEqual(job.status, 'finished')

Loading…
Cancel
Save