diff --git a/rq/job.py b/rq/job.py index 29562d6..c3ae913 100644 --- a/rq/job.py +++ b/rq/job.py @@ -506,7 +506,7 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) - def register_dependency(self): + def register_dependency(self, pipeline=None): """Jobs may have dependencies. Jobs are enqueued only if the job they depend on is successfully performed. We record this relation as a reverse dependency (a Redis set), with a key that looks something @@ -516,8 +516,8 @@ class Job(object): This method adds the current job in its dependency's dependents set. """ - # TODO: This can probably be pipelined - self.connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) + connection = pipeline if pipeline is not None else self.connection + connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) def __str__(self): return '' % (self.id, self.description) diff --git a/rq/queue.py b/rq/queue.py index 4296001..1b2dd04 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -179,8 +179,9 @@ class Queue(object): try: pipe.watch(depends_on.key) if depends_on.get_status() != Status.FINISHED: - job.register_dependency() - job.save() + job.register_dependency(pipeline=pipe) + job.save(pipeline=pipe) + pipe.execute() return job break except WatchError: