Merge pull request #390 from selwin/pipeline-dependency-registration

job.register_dependency() should support pipelining
main
Vincent Driessen 11 years ago
commit 3de8a47f06

@ -519,7 +519,7 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl) connection.expire(self.key, ttl)
def register_dependency(self): def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the job they """Jobs may have dependencies. Jobs are enqueued only if the job they
depend on is successfully performed. We record this relation as depend on is successfully performed. We record this relation as
a reverse dependency (a Redis set), with a key that looks something a reverse dependency (a Redis set), with a key that looks something
@ -529,8 +529,8 @@ class Job(object):
This method adds the current job in its dependency's dependents set. This method adds the current job in its dependency's dependents set.
""" """
# TODO: This can probably be pipelined connection = pipeline if pipeline is not None else self.connection
self.connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.description) return '<Job %s: %s>' % (self.id, self.description)

@ -188,8 +188,9 @@ class Queue(object):
try: try:
pipe.watch(depends_on.key) pipe.watch(depends_on.key)
if depends_on.get_status() != Status.FINISHED: if depends_on.get_status() != Status.FINISHED:
job.register_dependency() job.register_dependency(pipeline=pipe)
job.save() job.save(pipeline=pipe)
pipe.execute()
return job return job
break break
except WatchError: except WatchError:

Loading…
Cancel
Save