Merge pull request #564 from jlopex/javi/fix_dependent_jobs_on_different_queues

support multiple queues on dependent jobs
main
Selwin Ong 9 years ago
commit 1a089887c0

@ -260,24 +260,26 @@ class Queue(object):
description=description, depends_on=depends_on, description=description, depends_on=depends_on,
job_id=job_id, at_front=at_front) job_id=job_id, at_front=at_front)
def enqueue_job(self, job, at_front=False): def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
If Queue is instantiated with async=False, job is executed immediately. If Queue is instantiated with async=False, job is executed immediately.
""" """
with self.connection._pipeline() as pipeline: pipe = pipeline if pipeline is not None else self.connection._pipeline()
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipeline)
job.origin = self.name # Add Queue key set
job.enqueued_at = utcnow() pipe.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipe)
if job.timeout is None: job.origin = self.name
job.timeout = self.DEFAULT_TIMEOUT job.enqueued_at = utcnow()
job.save(pipeline=pipeline)
pipeline.execute() if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT
job.save(pipeline=pipe)
if pipeline is None:
pipe.execute()
if self._async: if self._async:
self.push_job_id(job.id, at_front=at_front) self.push_job_id(job.id, at_front=at_front)
@ -289,15 +291,20 @@ class Queue(object):
# TODO: can probably be pipelined # TODO: can probably be pipelined
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.name, self.connection)
while True: while True:
job_id = as_text(self.connection.spop(job.dependents_key)) job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None: if job_id is None:
break break
dependent = self.job_class.fetch(job_id, connection=self.connection) dependent = self.job_class.fetch(job_id, connection=self.connection)
registry.remove(dependent) registry = DeferredJobRegistry(dependent.origin, self.connection)
self.enqueue_job(dependent) with self.connection._pipeline() as pipeline:
registry.remove(dependent, pipeline=pipeline)
if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipeline)
else:
queue = Queue(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipeline)
pipeline.execute()
def pop_job_id(self): def pop_job_id(self):
"""Pops a given job ID from this Redis queue.""" """Pops a given job ID from this Redis queue."""

@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function,
from datetime import datetime from datetime import datetime
import time import time
from tests import RQTestCase from tests import RQTestCase, fixtures
from tests.helpers import strip_microseconds from tests.helpers import strip_microseconds
from rq.compat import PY2, as_text from rq.compat import PY2, as_text
@ -16,7 +16,7 @@ from rq.registry import DeferredJobRegistry
from rq.utils import utcformat from rq.utils import utcformat
from rq.worker import Worker from rq.worker import Worker
from . import fixtures
try: try:
from cPickle import loads, dumps from cPickle import loads, dumps

@ -350,6 +350,45 @@ class TestQueue(RQTestCase):
# DeferredJobRegistry should also be empty # DeferredJobRegistry should also be empty
self.assertEqual(registry.get_job_ids(), []) self.assertEqual(registry.get_job_ids(), [])
def test_enqueue_dependents_on_multiple_queues(self):
"""Enqueueing dependent jobs on multiple queues pushes jobs in the queues
and removes them from DeferredJobRegistry for each different queue."""
q_1 = Queue("queue_1")
q_2 = Queue("queue_2")
parent_job = Job.create(func=say_hello)
parent_job.save()
job_1 = q_1.enqueue(say_hello, depends_on=parent_job)
job_2 = q_2.enqueue(say_hello, depends_on=parent_job)
# Each queue has its own DeferredJobRegistry
registry_1 = DeferredJobRegistry(q_1.name, connection=self.testconn)
self.assertEqual(
set(registry_1.get_job_ids()),
set([job_1.id])
)
registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn)
self.assertEqual(
set(registry_2.get_job_ids()),
set([job_2.id])
)
# After dependents is enqueued, job_1 on queue_1 and
# job_2 should be in queue_2
self.assertEqual(q_1.job_ids, [])
self.assertEqual(q_2.job_ids, [])
q_1.enqueue_dependents(parent_job)
q_2.enqueue_dependents(parent_job)
self.assertEqual(set(q_1.job_ids), set([job_1.id]))
self.assertEqual(set(q_2.job_ids), set([job_2.id]))
self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry_1.get_job_ids(), [])
self.assertEqual(registry_2.get_job_ids(), [])
def test_enqueue_job_with_dependency(self): def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished.""" """Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued

Loading…
Cancel
Save