diff --git a/rq/job.py b/rq/job.py index c757a49..ef7a266 100644 --- a/rq/job.py +++ b/rq/job.py @@ -519,7 +519,7 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) - def register_dependency(self): + def register_dependency(self, pipeline=None): """Jobs may have dependencies. Jobs are enqueued only if the job they depend on is successfully performed. We record this relation as a reverse dependency (a Redis set), with a key that looks something @@ -529,8 +529,8 @@ class Job(object): This method adds the current job in its dependency's dependents set. """ - # TODO: This can probably be pipelined - self.connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) + connection = pipeline if pipeline is not None else self.connection + connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) def __str__(self): return '' % (self.id, self.description) diff --git a/rq/queue.py b/rq/queue.py index a5c69e0..22aa160 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -188,8 +188,9 @@ class Queue(object): try: pipe.watch(depends_on.key) if depends_on.get_status() != Status.FINISHED: - job.register_dependency() - job.save() + job.register_dependency(pipeline=pipe) + job.save(pipeline=pipe) + pipe.execute() return job break except WatchError: