job.register_dependency() should support pipelining.

main
Selwin Ong 11 years ago
parent 052d0df4bf
commit fc19a64670

@ -506,7 +506,7 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl) connection.expire(self.key, ttl)
def register_dependency(self): def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the job they """Jobs may have dependencies. Jobs are enqueued only if the job they
depend on is successfully performed. We record this relation as depend on is successfully performed. We record this relation as
a reverse dependency (a Redis set), with a key that looks something a reverse dependency (a Redis set), with a key that looks something
@ -516,8 +516,8 @@ class Job(object):
This method adds the current job in its dependency's dependents set. This method adds the current job in its dependency's dependents set.
""" """
# TODO: This can probably be pipelined connection = pipeline if pipeline is not None else self.connection
self.connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.description) return '<Job %s: %s>' % (self.id, self.description)

@ -179,8 +179,9 @@ class Queue(object):
try: try:
pipe.watch(depends_on.key) pipe.watch(depends_on.key)
if depends_on.get_status() != Status.FINISHED: if depends_on.get_status() != Status.FINISHED:
job.register_dependency() job.register_dependency(pipeline=pipe)
job.save() job.save(pipeline=pipe)
pipe.execute()
return job return job
break break
except WatchError: except WatchError:

Loading…
Cancel
Save