Internally renamed the term "parent" to "dependency".

main
Selwin Ong 12 years ago
parent 0dfb041383
commit 2e826e2b1f

@ -64,7 +64,7 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, parent=None): result_ttl=None, status=None, dependency=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -87,9 +87,9 @@ class Job(object):
job.description = job.get_call_string() job.description = job.get_call_string()
job.result_ttl = result_ttl job.result_ttl = result_ttl
job._status = status job._status = status
# parent could be job instance or id # dependency could be job instance or id
if parent is not None: if dependency is not None:
job._parent_id = parent.id if isinstance(parent, Job) else parent job._dependency_id = dependency.id if isinstance(dependency, Job) else dependency
return job return job
@property @property
@ -123,17 +123,17 @@ class Job(object):
return self.status == Status.STARTED return self.status == Status.STARTED
@property @property
def parent(self): def dependency(self):
"""Returns a job's parent. To avoid repeated Redis fetches, we cache """Returns a job's dependency. To avoid repeated Redis fetches, we cache
job.parent as job._parent. job.dependency as job._dependency.
""" """
if self._parent_id is None: if self._dependency_id is None:
return None return None
if hasattr(self, '_parent'): if hasattr(self, '_dependency'):
return self._parent return self._dependency
job = Job.fetch(self._parent_id, connection=self.connection) job = Job.fetch(self._dependency_id, connection=self.connection)
job.refresh() job.refresh()
self._parent = job self._dependency = job
return job return job
@property @property
@ -202,7 +202,7 @@ class Job(object):
self.timeout = None self.timeout = None
self.result_ttl = None self.result_ttl = None
self._status = None self._status = None
self._parent_id = None self._dependency_id = None
self.meta = {} self.meta = {}
@ -313,7 +313,7 @@ class Job(object):
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = obj.get('status') if obj.get('status') else None self._status = obj.get('status') if obj.get('status') else None
self._parent_id = obj.get('parent_id', None) self._dependency_id = obj.get('dependency_id', None)
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def save(self, pipeline=None): def save(self, pipeline=None):
@ -344,8 +344,8 @@ class Job(object):
obj['result_ttl'] = self.result_ttl obj['result_ttl'] = self.result_ttl
if self._status is not None: if self._status is not None:
obj['status'] = self._status obj['status'] = self._status
if self._parent_id is not None: if self._dependency_id is not None:
obj['parent_id'] = self._parent_id obj['dependency_id'] = self._dependency_id
if self.meta: if self.meta:
obj['meta'] = dumps(self.meta) obj['meta'] = dumps(self.meta)
@ -415,15 +415,15 @@ class Job(object):
def register_dependency(self): def register_dependency(self):
"""Jobs may have a waitlist. Jobs in this waitlist are enqueued """Jobs may have a waitlist. Jobs in this waitlist are enqueued
only if the parent job is successfully performed. We maintain this only if the dependency job is successfully performed. We maintain this
waitlist in Redis, with key that looks something like: waitlist in Redis, with key that looks something like:
rq:job:job_id:waitlist = ['job_id_1', 'job_id_2'] rq:job:job_id:waitlist = ['job_id_1', 'job_id_2']
This method puts the job on it's parent's waitlist. This method puts the job on it's dependency's waitlist.
""" """
# TODO: This can probably be pipelined # TODO: This can probably be pipelined
self.connection.rpush(Job.waitlist_key_for(self._parent_id), self.id) self.connection.rpush(Job.waitlist_key_for(self._dependency_id), self.id)
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.description) return '<Job %s: %s>' % (self.id, self.description)
@ -459,7 +459,7 @@ class Job(object):
'description', '_args', 'created_at', 'enqueued_at', 'connection', 'description', '_args', 'created_at', 'enqueued_at', 'connection',
'_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id', '_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id',
'data', '_instance', 'result_ttl', '_status', 'status', 'data', '_instance', 'result_ttl', '_status', 'status',
'_parent_id', '_parent', 'parent', 'meta')) '_dependency_id', '_dependency', 'dependency', 'meta'))
if name in private_attrs: if name in private_attrs:
object.__setattr__(self, name, value) object.__setattr__(self, name, value)

@ -128,7 +128,7 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status # TODO: job with dependency shouldn't have "queued" as status
job = Job.create(func, args, kwargs, connection=self.connection, job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, status=Status.QUEUED,
parent=after) dependency=after)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's waitlist instead of enqueueing it # parent's waitlist instead of enqueueing it

@ -135,17 +135,17 @@ class TestJob(RQTestCase):
"""Storing jobs with parent job, either instance or key.""" """Storing jobs with parent job, either instance or key."""
parent_job = Job.create(func=some_calculation) parent_job = Job.create(func=some_calculation)
parent_job.save() parent_job.save()
job = Job.create(func=some_calculation, parent=parent_job) job = Job.create(func=some_calculation, dependency=parent_job)
job.save() job.save()
stored_job = Job.fetch(job.id) stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._parent_id, parent_job.id) self.assertEqual(stored_job._dependency_id, parent_job.id)
self.assertEqual(stored_job.parent, parent_job) self.assertEqual(stored_job.dependency, parent_job)
job = Job.create(func=some_calculation, parent=parent_job.id) job = Job.create(func=some_calculation, dependency=parent_job.id)
job.save() job.save()
stored_job = Job.fetch(job.id) stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._parent_id, parent_job.id) self.assertEqual(stored_job._dependency_id, parent_job.id)
self.assertEqual(stored_job.parent, parent_job) self.assertEqual(stored_job.dependency, parent_job)
def test_store_then_fetch(self): def test_store_then_fetch(self):
"""Store, then fetch.""" """Store, then fetch."""
@ -269,7 +269,7 @@ class TestJob(RQTestCase):
def test_register_dependency(self): def test_register_dependency(self):
"""Test that jobs updates the correct job waitlist""" """Test that jobs updates the correct job waitlist"""
job = Job.create(func=say_hello) job = Job.create(func=say_hello)
job._parent_id = 'id' job._dependency_id = 'id'
job.save() job.save()
job.register_dependency() job.register_dependency()
self.assertEqual(self.testconn.lpop('rq:job:id:waitlist'), job.id) self.assertEqual(self.testconn.lpop('rq:job:id:waitlist'), job.id)

@ -242,10 +242,10 @@ class TestQueue(RQTestCase):
q = Queue() q = Queue()
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save() parent_job.save()
job_1 = Job.create(func=say_hello, parent=parent_job) job_1 = Job.create(func=say_hello, dependency=parent_job)
job_1.save() job_1.save()
job_1.register_dependency() job_1.register_dependency()
job_2 = Job.create(func=say_hello, parent=parent_job) job_2 = Job.create(func=say_hello, dependency=parent_job)
job_2.save() job_2.save()
job_2.register_dependency() job_2.register_dependency()

Loading…
Cancel
Save