Added WorkingQueue class.

main
Selwin Ong 11 years ago
parent 3de8a47f06
commit e61d1505bc

@ -8,6 +8,7 @@ terminal colorizing code, originally by Georg Brandl.
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import calendar
import importlib import importlib
import datetime import datetime
import logging import logging
@ -229,3 +230,8 @@ def first(iterable, default=None, key=None):
return el return el
return default return default
def current_timestamp():
"""Returns current UTC timestamp"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple())

@ -0,0 +1,38 @@
from .connections import resolve_connection
from .queue import FailedQueue
from .utils import current_timestamp
class WorkingQueue:
"""
Registry of currently executing jobs. Each queue maintains a WorkingQueue.
WorkingQueue contains job keys that are currently being executed.
Each key is scored by job's expiration time (datetime started + timeout).
Jobs are added to registry right before they are executed and removed
right after completion (success or failure).
Jobs whose score are lower than current time is considered "expired".
"""
def __init__(self, name, connection=None):
self.name = name
self.key = 'rq:wip:%s' % name
self.connection = resolve_connection(connection)
def add(self, job, timeout):
"""Adds a job to WorkingQueue with expiry time of now + timeout."""
return self.connection._zadd(self.key, current_timestamp() + timeout,
job.id)
def remove(self, job):
return self.connection.zrem(self.key, job.id)
def get_expired_job_ids(self):
"""Returns job ids whose score are less than current timestamp."""
return self.connection.zrangebyscore(self.key, 0, current_timestamp())
def get_job_ids(self, start=0, end=-1):
"""Returns list of all job ids."""
return self.connection.zrange(self.key, start, end)

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from rq.job import Job
from rq.utils import current_timestamp
from rq.working_queue import WorkingQueue
from tests import RQTestCase
class TestQueue(RQTestCase):
def setUp(self):
super(TestQueue, self).setUp()
self.working_queue = WorkingQueue('default', connection=self.testconn)
def test_add_and_remove(self):
"""Adding and removing job to WorkingQueue."""
timestamp = current_timestamp()
job = Job()
# Test that job is added with the right score
self.working_queue.add(job, 1000)
self.assertLess(self.testconn.zscore(self.working_queue.key, job.id),
timestamp + 1001)
# Ensure that job is properly removed from sorted set
self.working_queue.remove(job)
self.assertIsNone(self.testconn.zscore(self.working_queue.key, job.id))
def test_get_job_ids(self):
"""Getting job ids from WorkingQueue."""
self.testconn.zadd(self.working_queue.key, 1, 'foo')
self.testconn.zadd(self.working_queue.key, 10, 'bar')
self.assertEqual(self.working_queue.get_job_ids(), ['foo', 'bar'])
def test_get_expired_job_ids(self):
"""Getting expired job ids form WorkingQueue."""
timestamp = current_timestamp()
self.testconn.zadd(self.working_queue.key, 1, 'foo')
self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar')
self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo'])
Loading…
Cancel
Save