From 67ae9277e5cc1aea5aa3c4aad9b0de254ab55ea2 Mon Sep 17 00:00:00 2001 From: Stefan Hammer Date: Sat, 13 Aug 2016 20:22:20 +0200 Subject: [PATCH] completely pipelined enqueue_dependents --- rq/job.py | 2 +- rq/queue.py | 60 +++++++++++++++++++++++++++++++++++++--------------- rq/worker.py | 3 +-- 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/rq/job.py b/rq/job.py index ce1894b..d037374 100644 --- a/rq/job.py +++ b/rq/job.py @@ -543,7 +543,7 @@ class Job(object): forever) """ if ttl == 0: - self.delete(remove_from_queue=remove_from_queue) + self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) elif not ttl: return elif ttl > 0: diff --git a/rq/queue.py b/rq/queue.py index 8f288ca..1e5b7e4 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -280,35 +280,61 @@ class Queue(object): job.timeout = self.DEFAULT_TIMEOUT job.save(pipeline=pipe) + if self._async: + self.push_job_id(job.id, pipeline=pipe, at_front=at_front) + if pipeline is None: pipe.execute() - if self._async: - self.push_job_id(job.id, at_front=at_front) - return job def enqueue_dependents(self, job, pipeline=None): - """Enqueues all jobs in the given job's dependents set and clears it.""" - # TODO: can probably be pipelined + """Enqueues all jobs in the given job's dependents set and clears it. + + When called without a pipeline, this method uses WATCH/MULTI/EXEC. + If you pass a pipeline, only MULTI is called. The rest is up to the + caller. + """ from .registry import DeferredJobRegistry - dependents_connection = pipeline if pipeline is not None else self.connection + pipe = pipeline if pipeline is not None else self.connection._pipeline() + dependents_key = job.dependents_key while True: - job_id = as_text(dependents_connection.spop(job.dependents_key)) - if job_id is None: + try: + # if a pipeline is passed, the caller is responsible for calling WATCH + # to ensure all jobs are enqueued + if pipeline is None: + pipe.watch(dependents_key) + + dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) + for job_id in pipe.smembers(dependents_key)] + + pipe.multi() + + for dependent in dependent_jobs: + registry = DeferredJobRegistry(dependent.origin, self.connection) + registry.remove(dependent, pipeline=pipe) + if dependent.origin == self.name: + self.enqueue_job(dependent, pipeline=pipe) + else: + queue = Queue(name=dependent.origin, connection=self.connection) + queue.enqueue_job(dependent, pipeline=pipe) + + pipe.delete(dependents_key) + + if pipeline is None: + pipe.execute() + break - dependent = self.job_class.fetch(job_id, connection=self.connection) - registry = DeferredJobRegistry(dependent.origin, self.connection) - with self.connection._pipeline() as pipeline: - registry.remove(dependent, pipeline=pipeline) - if dependent.origin == self.name: - self.enqueue_job(dependent, pipeline=pipeline) + except WatchError: + if pipeline is None: + continue else: - queue = Queue(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent, pipeline=pipeline) - pipeline.execute() + # if the pipeline comes from the caller, we re-raise the + # exception as it it the responsibility of the caller to + # handle it + raise def pop_job_id(self): """Pops a given job ID from this Redis queue.""" diff --git a/rq/worker.py b/rq/worker.py index b44ffe7..20554ee 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -678,10 +678,9 @@ class Worker(object): # if dependencies are inserted after enqueue_dependents # a WatchError is thrown by execute() pipeline.watch(job.dependents_key) + # enqueue_dependents calls multi() on the pipeline! queue.enqueue_dependents(job, pipeline=pipeline) - # pipeline all following commands (reads won't work!) - pipeline.multi() self.set_current_job_id(None, pipeline=pipeline) if result_ttl != 0: