Added DeferredJobsRegistry to keep track of deferred jobs.

main
Selwin Ong 10 years ago
parent 3e674fbe6a
commit dac0be6cc7

@ -544,8 +544,14 @@ class Job(object):
rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
This method adds the current job in its dependency's dependents set.
This method adds the job in its dependency's dependents set
and adds the job to DeferredJobRegistry.
"""
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin, connection=self.connection)
registry.add(self, pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection
connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)

@ -279,11 +279,16 @@ class Queue(object):
def enqueue_dependents(self, job):
"""Enqueues all jobs in the given job's dependents set and clears it."""
# TODO: can probably be pipelined
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.name, self.connection)
while True:
job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None:
break
dependent = self.job_class.fetch(job_id, connection=self.connection)
registry.remove(dependent)
self.enqueue_job(dependent)
def pop_job_id(self):

@ -24,7 +24,7 @@ class BaseRegistry(object):
self.cleanup()
return self.connection.zcard(self.key)
def add(self, job, ttl, pipeline=None):
def add(self, job, ttl=0, pipeline=None):
"""Adds a job to a registry with expiry time of now + ttl."""
score = ttl if ttl < 0 else current_timestamp() + ttl
if pipeline is not None:
@ -108,3 +108,19 @@ class FinishedJobRegistry(BaseRegistry):
"""
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)
class DeferredJobRegistry(BaseRegistry):
"""
Registry of deferred jobs (waiting for another job to finish).
"""
def __init__(self, name='default', connection=None):
super(DeferredJobRegistry, self).__init__(name, connection)
self.key = 'rq:deferred:%s' % name
def cleanup(self):
"""This method is only here to prevent errors because this method is
automatically called by `count()` and `get_job_ids()` methods
implemented in BaseRegistry."""
pass

@ -7,6 +7,7 @@ from datetime import datetime
from rq.compat import as_text, PY2
from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import get_current_job, Job
from rq.registry import DeferredJobRegistry
from rq.queue import Queue
from rq.utils import utcformat
@ -331,12 +332,18 @@ class TestJob(RQTestCase):
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_register_dependency(self):
"""Test that jobs updates the correct job dependents."""
job = Job.create(func=say_hello)
"""Ensure dependency registration works properly."""
origin = 'some_queue'
registry = DeferredJobRegistry(origin, self.testconn)
job = Job.create(func=say_hello, origin=origin)
job._dependency_id = 'id'
job.save()
self.assertEqual(registry.get_job_ids(), [])
job.register_dependency()
self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id)
self.assertEqual(registry.get_job_ids(), [job.id])
def test_cancel(self):
"""job.cancel() deletes itself & dependents mapping from Redis."""

@ -5,6 +5,7 @@ from __future__ import (absolute_import, division, print_function,
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry
from rq.worker import Worker
from tests import RQTestCase
@ -319,23 +320,28 @@ class TestQueue(RQTestCase):
self.assertEquals(len(Queue.all()), 3)
def test_enqueue_dependents(self):
"""Enqueueing the dependent jobs pushes all jobs in the depends set to the queue."""
"""Enqueueing dependent jobs pushes all jobs in the depends set to the queue
and removes them from DeferredJobQueue."""
q = Queue()
parent_job = Job.create(func=say_hello)
parent_job.save()
job_1 = Job.create(func=say_hello, depends_on=parent_job)
job_1.save()
job_1.register_dependency()
job_2 = Job.create(func=say_hello, depends_on=parent_job)
job_2.save()
job_2.register_dependency()
job_1 = q.enqueue(say_hello, depends_on=parent_job)
job_2 = q.enqueue(say_hello, depends_on=parent_job)
registry = DeferredJobRegistry(q.name, connection=self.testconn)
self.assertEqual(
set(registry.get_job_ids()),
set([job_1.id, job_2.id])
)
# After dependents is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, [])
q.enqueue_dependents(parent_job)
self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id]))
self.assertEqual(set(q.job_ids), set([job_2.id, job_1.id]))
self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry.get_job_ids(), [])
def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued

@ -5,7 +5,8 @@ from rq.job import Job
from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.registry import FinishedJobRegistry, StartedJobRegistry
from rq.registry import (DeferredJobRegistry, FinishedJobRegistry,
StartedJobRegistry)
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
@ -135,3 +136,19 @@ class TestFinishedJobRegistry(RQTestCase):
failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job)
self.assertEqual(self.registry.get_job_ids(), [job.id])
class TestRegistry(RQTestCase):
def setUp(self):
super(TestRegistry, self).setUp()
self.registry = DeferredJobRegistry(connection=self.testconn)
def test_add(self):
"""Adding a job to DeferredJobsRegistry."""
job = Job()
self.registry.add(job)
self.assertEqual(
self.testconn.zrange(self.registry.key, 0, -1),
[job.id]
)
Loading…
Cancel
Save