completely pipelined enqueue_dependents

main
Stefan Hammer 8 years ago
parent 7d267cf7ef
commit 67ae9277e5

@ -543,7 +543,7 @@ class Job(object):
forever) forever)
""" """
if ttl == 0: if ttl == 0:
self.delete(remove_from_queue=remove_from_queue) self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue)
elif not ttl: elif not ttl:
return return
elif ttl > 0: elif ttl > 0:

@ -280,35 +280,61 @@ class Queue(object):
job.timeout = self.DEFAULT_TIMEOUT job.timeout = self.DEFAULT_TIMEOUT
job.save(pipeline=pipe) job.save(pipeline=pipe)
if self._async:
self.push_job_id(job.id, pipeline=pipe, at_front=at_front)
if pipeline is None: if pipeline is None:
pipe.execute() pipe.execute()
if self._async:
self.push_job_id(job.id, at_front=at_front)
return job return job
def enqueue_dependents(self, job, pipeline=None): def enqueue_dependents(self, job, pipeline=None):
"""Enqueues all jobs in the given job's dependents set and clears it.""" """Enqueues all jobs in the given job's dependents set and clears it.
# TODO: can probably be pipelined
When called without a pipeline, this method uses WATCH/MULTI/EXEC.
If you pass a pipeline, only MULTI is called. The rest is up to the
caller.
"""
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry
dependents_connection = pipeline if pipeline is not None else self.connection pipe = pipeline if pipeline is not None else self.connection._pipeline()
dependents_key = job.dependents_key
while True: while True:
job_id = as_text(dependents_connection.spop(job.dependents_key)) try:
if job_id is None: # if a pipeline is passed, the caller is responsible for calling WATCH
break # to ensure all jobs are enqueued
dependent = self.job_class.fetch(job_id, connection=self.connection) if pipeline is None:
pipe.watch(dependents_key)
dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection)
for job_id in pipe.smembers(dependents_key)]
pipe.multi()
for dependent in dependent_jobs:
registry = DeferredJobRegistry(dependent.origin, self.connection) registry = DeferredJobRegistry(dependent.origin, self.connection)
with self.connection._pipeline() as pipeline: registry.remove(dependent, pipeline=pipe)
registry.remove(dependent, pipeline=pipeline)
if dependent.origin == self.name: if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipeline) self.enqueue_job(dependent, pipeline=pipe)
else: else:
queue = Queue(name=dependent.origin, connection=self.connection) queue = Queue(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipeline) queue.enqueue_job(dependent, pipeline=pipe)
pipeline.execute()
pipe.delete(dependents_key)
if pipeline is None:
pipe.execute()
break
except WatchError:
if pipeline is None:
continue
else:
# if the pipeline comes from the caller, we re-raise the
# exception as it it the responsibility of the caller to
# handle it
raise
def pop_job_id(self): def pop_job_id(self):
"""Pops a given job ID from this Redis queue.""" """Pops a given job ID from this Redis queue."""

@ -678,10 +678,9 @@ class Worker(object):
# if dependencies are inserted after enqueue_dependents # if dependencies are inserted after enqueue_dependents
# a WatchError is thrown by execute() # a WatchError is thrown by execute()
pipeline.watch(job.dependents_key) pipeline.watch(job.dependents_key)
# enqueue_dependents calls multi() on the pipeline!
queue.enqueue_dependents(job, pipeline=pipeline) queue.enqueue_dependents(job, pipeline=pipeline)
# pipeline all following commands (reads won't work!)
pipeline.multi()
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
if result_ttl != 0: if result_ttl != 0:

Loading…
Cancel
Save