Merge branch 'master' of github.com:nvie/rq

main
Selwin Ong 10 years ago
commit 5360222a25

@ -25,9 +25,14 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads loads = pickle.loads
JobStatus = enum('JobStatus', JobStatus = enum(
QUEUED='queued', FINISHED='finished', FAILED='failed', 'JobStatus',
STARTED='started') QUEUED='queued',
FINISHED='finished',
FAILED='failed',
STARTED='started',
DEFERRED='deferred'
)
# Sentinel value to mark that some of our lazily evaluated properties have not # Sentinel value to mark that some of our lazily evaluated properties have not
# yet been evaluated. # yet been evaluated.
@ -83,8 +88,8 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, result_ttl=None, ttl=None, status=None, description=None,
id=None): depends_on=None, timeout=None, id=None, origin=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -102,6 +107,9 @@ class Job(object):
if id is not None: if id is not None:
job.set_id(id) job.set_id(id)
if origin is not None:
job.origin = origin
# Set the core job tuple properties # Set the core job tuple properties
job._instance = None job._instance = None
if inspect.ismethod(func): if inspect.ismethod(func):
@ -536,8 +544,14 @@ class Job(object):
rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} rq:job:job_id:dependents = {'job_id_1', 'job_id_2'}
This method adds the current job in its dependency's dependents set. This method adds the job in its dependency's dependents set
and adds the job to DeferredJobRegistry.
""" """
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin, connection=self.connection)
registry.add(self, pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.sadd(Job.dependents_key_for(self._dependency_id), self.id) connection.sadd(Job.dependents_key_for(self._dependency_id), self.id)

@ -182,11 +182,11 @@ class Queue(object):
""" """
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
# TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(
job = self.job_class.create(func, args, kwargs, connection=self.connection, func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=JobStatus.QUEUED, result_ttl=result_ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on, timeout=timeout, description=description, depends_on=depends_on,
id=job_id) timeout=timeout, id=job_id, origin=self.name)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's dependents instead of enqueueing it. # parent's dependents instead of enqueueing it.
@ -200,6 +200,7 @@ class Queue(object):
try: try:
pipe.watch(depends_on.key) pipe.watch(depends_on.key)
if depends_on.get_status() != JobStatus.FINISHED: if depends_on.get_status() != JobStatus.FINISHED:
job.set_status(JobStatus.DEFERRED)
job.register_dependency(pipeline=pipe) job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe) job.save(pipeline=pipe)
pipe.execute() pipe.execute()
@ -248,24 +249,25 @@ class Queue(object):
description=description, depends_on=depends_on, description=description, depends_on=depends_on,
job_id=job_id, at_front=at_front) job_id=job_id, at_front=at_front)
def enqueue_job(self, job, set_meta_data=True, at_front=False): def enqueue_job(self, job, at_front=False):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`.
If Queue is instantiated with async=False, job is executed immediately. If Queue is instantiated with async=False, job is executed immediately.
""" """
with self.connection._pipeline() as pipeline:
# Add Queue key set # Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key) self.connection.sadd(self.redis_queues_keys, self.key)
job.set_status(JobStatus.QUEUED, pipeline=pipeline)
if set_meta_data:
job.origin = self.name job.origin = self.name
job.enqueued_at = utcnow() job.enqueued_at = utcnow()
if job.timeout is None: if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT job.timeout = self.DEFAULT_TIMEOUT
job.save() job.save(pipeline=pipeline)
pipeline.execute()
if self._async: if self._async:
self.push_job_id(job.id, at_front=at_front) self.push_job_id(job.id, at_front=at_front)
@ -277,11 +279,16 @@ class Queue(object):
def enqueue_dependents(self, job): def enqueue_dependents(self, job):
"""Enqueues all jobs in the given job's dependents set and clears it.""" """Enqueues all jobs in the given job's dependents set and clears it."""
# TODO: can probably be pipelined # TODO: can probably be pipelined
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.name, self.connection)
while True: while True:
job_id = as_text(self.connection.spop(job.dependents_key)) job_id = as_text(self.connection.spop(job.dependents_key))
if job_id is None: if job_id is None:
break break
dependent = self.job_class.fetch(job_id, connection=self.connection) dependent = self.job_class.fetch(job_id, connection=self.connection)
registry.remove(dependent)
self.enqueue_job(dependent) self.enqueue_job(dependent)
def pop_job_id(self): def pop_job_id(self):
@ -401,14 +408,20 @@ class FailedQueue(Queue):
def quarantine(self, job, exc_info): def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed """Puts the given Job in quarantine (i.e. put it on the failed
queue). queue).
This is different from normal job enqueueing, since certain meta data
must not be overridden (e.g. `origin` or `enqueued_at`) and other meta
data must be inserted (`ended_at` and `exc_info`).
""" """
with self.connection._pipeline() as pipeline:
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
job.ended_at = utcnow() job.ended_at = utcnow()
job.exc_info = exc_info job.exc_info = exc_info
return self.enqueue_job(job, set_meta_data=False) job.save(pipeline=pipeline)
self.push_job_id(job.id, pipeline=pipeline)
pipeline.execute()
return job
def requeue(self, job_id): def requeue(self, job_id):
"""Requeues the job with the given job ID.""" """Requeues the job with the given job ID."""

@ -24,7 +24,7 @@ class BaseRegistry(object):
self.cleanup() self.cleanup()
return self.connection.zcard(self.key) return self.connection.zcard(self.key)
def add(self, job, ttl, pipeline=None): def add(self, job, ttl=0, pipeline=None):
"""Adds a job to a registry with expiry time of now + ttl.""" """Adds a job to a registry with expiry time of now + ttl."""
score = ttl if ttl < 0 else current_timestamp() + ttl score = ttl if ttl < 0 else current_timestamp() + ttl
if pipeline is not None: if pipeline is not None:
@ -108,3 +108,19 @@ class FinishedJobRegistry(BaseRegistry):
""" """
score = timestamp if timestamp is not None else current_timestamp() score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score) self.connection.zremrangebyscore(self.key, 0, score)
class DeferredJobRegistry(BaseRegistry):
"""
Registry of deferred jobs (waiting for another job to finish).
"""
def __init__(self, name='default', connection=None):
super(DeferredJobRegistry, self).__init__(name, connection)
self.key = 'rq:deferred:%s' % name
def cleanup(self):
"""This method is only here to prevent errors because this method is
automatically called by `count()` and `get_job_ids()` methods
implemented in BaseRegistry."""
pass

@ -7,6 +7,7 @@ from datetime import datetime
from rq.compat import as_text, PY2 from rq.compat import as_text, PY2
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import get_current_job, Job from rq.job import get_current_job, Job
from rq.registry import DeferredJobRegistry
from rq.queue import Queue from rq.queue import Queue
from rq.utils import utcformat from rq.utils import utcformat
@ -331,12 +332,18 @@ class TestJob(RQTestCase):
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_register_dependency(self): def test_register_dependency(self):
"""Test that jobs updates the correct job dependents.""" """Ensure dependency registration works properly."""
job = Job.create(func=say_hello) origin = 'some_queue'
registry = DeferredJobRegistry(origin, self.testconn)
job = Job.create(func=say_hello, origin=origin)
job._dependency_id = 'id' job._dependency_id = 'id'
job.save() job.save()
self.assertEqual(registry.get_job_ids(), [])
job.register_dependency() job.register_dependency()
self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id) self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id)
self.assertEqual(registry.get_job_ids(), [job.id])
def test_cancel(self): def test_cancel(self):
"""job.cancel() deletes itself & dependents mapping from Redis.""" """job.cancel() deletes itself & dependents mapping from Redis."""

@ -5,6 +5,7 @@ from __future__ import (absolute_import, division, print_function,
from rq import get_failed_queue, Queue from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry
from rq.worker import Worker from rq.worker import Worker
from tests import RQTestCase from tests import RQTestCase
@ -117,6 +118,7 @@ class TestQueue(RQTestCase):
# say_hello spec holds which queue this is sent to # say_hello spec holds which queue this is sent to
job = q.enqueue(say_hello, 'Nick', foo='bar') job = q.enqueue(say_hello, 'Nick', foo='bar')
job_id = job.id job_id = job.id
self.assertEqual(job.origin, q.name)
# Inspect data inside Redis # Inspect data inside Redis
q_key = 'rq:queue:default' q_key = 'rq:queue:default'
@ -131,14 +133,12 @@ class TestQueue(RQTestCase):
job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar')) job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar'))
# Preconditions # Preconditions
self.assertIsNone(job.origin)
self.assertIsNone(job.enqueued_at) self.assertIsNone(job.enqueued_at)
# Action # Action
q.enqueue_job(job) q.enqueue_job(job)
# Postconditions # Postconditions
self.assertEquals(job.origin, q.name)
self.assertIsNotNone(job.enqueued_at) self.assertIsNotNone(job.enqueued_at)
def test_pop_job_id(self): def test_pop_job_id(self):
@ -320,30 +320,36 @@ class TestQueue(RQTestCase):
self.assertEquals(len(Queue.all()), 3) self.assertEquals(len(Queue.all()), 3)
def test_enqueue_dependents(self): def test_enqueue_dependents(self):
"""Enqueueing the dependent jobs pushes all jobs in the depends set to the queue.""" """Enqueueing dependent jobs pushes all jobs in the depends set to the queue
and removes them from DeferredJobQueue."""
q = Queue() q = Queue()
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save() parent_job.save()
job_1 = Job.create(func=say_hello, depends_on=parent_job) job_1 = q.enqueue(say_hello, depends_on=parent_job)
job_1.save() job_2 = q.enqueue(say_hello, depends_on=parent_job)
job_1.register_dependency()
job_2 = Job.create(func=say_hello, depends_on=parent_job)
job_2.save()
job_2.register_dependency()
registry = DeferredJobRegistry(q.name, connection=self.testconn)
self.assertEqual(
set(registry.get_job_ids()),
set([job_1.id, job_2.id])
)
# After dependents is enqueued, job_1 and job_2 should be in queue # After dependents is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
q.enqueue_dependents(parent_job) q.enqueue_dependents(parent_job)
self.assertEqual(set(q.job_ids), set([job_1.id, job_2.id])) self.assertEqual(set(q.job_ids), set([job_2.id, job_1.id]))
self.assertFalse(self.testconn.exists(parent_job.dependents_key)) self.assertFalse(self.testconn.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry.get_job_ids(), [])
def test_enqueue_job_with_dependency(self): def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished.""" """Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
# Jobs dependent on finished jobs are immediately enqueued # Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED) parent_job.set_status(JobStatus.FINISHED)
@ -351,6 +357,7 @@ class TestQueue(RQTestCase):
job = q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_job_with_dependency_by_id(self): def test_enqueue_job_with_dependency_by_id(self):
"""Enqueueing jobs should work as expected by id as well as job-objects.""" """Enqueueing jobs should work as expected by id as well as job-objects."""
@ -368,7 +375,7 @@ class TestQueue(RQTestCase):
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
def test_enqueue_job_with_dependency_and_timeout(self): def test_enqueue_job_with_dependency_and_timeout(self):
"""Jobs still know their specified timeout after being scheduled as a dependency.""" """Jobs remember their timeout when enqueued as a dependency."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()

@ -1,11 +1,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import absolute_import from __future__ import absolute_import
from rq.compat import as_text
from rq.job import Job from rq.job import Job
from rq.queue import FailedQueue, Queue from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker from rq.worker import Worker
from rq.registry import FinishedJobRegistry, StartedJobRegistry from rq.registry import (DeferredJobRegistry, FinishedJobRegistry,
StartedJobRegistry)
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello from tests.fixtures import div_by_zero, say_hello
@ -119,7 +121,6 @@ class TestFinishedJobRegistry(RQTestCase):
self.registry.cleanup(timestamp + 20) self.registry.cleanup(timestamp + 20)
self.assertEqual(self.registry.get_job_ids(), ['baz']) self.assertEqual(self.registry.get_job_ids(), ['baz'])
def test_jobs_are_put_in_registry(self): def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry.""" """Completed jobs are added to FinishedJobRegistry."""
self.assertEqual(self.registry.get_job_ids(), []) self.assertEqual(self.registry.get_job_ids(), [])
@ -135,3 +136,18 @@ class TestFinishedJobRegistry(RQTestCase):
failed_job = queue.enqueue(div_by_zero) failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job) worker.perform_job(failed_job)
self.assertEqual(self.registry.get_job_ids(), [job.id]) self.assertEqual(self.registry.get_job_ids(), [job.id])
class TestDeferredRegistry(RQTestCase):
def setUp(self):
super(TestDeferredRegistry, self).setUp()
self.registry = DeferredJobRegistry(connection=self.testconn)
def test_add(self):
"""Adding a job to DeferredJobsRegistry."""
job = Job()
self.registry.add(job)
job_ids = [as_text(job_id) for job_id in
self.testconn.zrange(self.registry.key, 0, -1)]
self.assertEqual(job_ids, [job.id])

Loading…
Cancel
Save