diff --git a/rq/utils.py b/rq/utils.py index d85235e..885e5b0 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -8,6 +8,7 @@ terminal colorizing code, originally by Georg Brandl. from __future__ import (absolute_import, division, print_function, unicode_literals) +import calendar import importlib import datetime import logging @@ -229,3 +230,8 @@ def first(iterable, default=None, key=None): return el return default + + +def current_timestamp(): + """Returns current UTC timestamp""" + return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) diff --git a/rq/working_queue.py b/rq/working_queue.py new file mode 100644 index 0000000..3e5da45 --- /dev/null +++ b/rq/working_queue.py @@ -0,0 +1,38 @@ +from .connections import resolve_connection +from .queue import FailedQueue +from .utils import current_timestamp + + +class WorkingQueue: + """ + Registry of currently executing jobs. Each queue maintains a WorkingQueue. + WorkingQueue contains job keys that are currently being executed. + Each key is scored by job's expiration time (datetime started + timeout). + + Jobs are added to registry right before they are executed and removed + right after completion (success or failure). + + Jobs whose score are lower than current time is considered "expired". + """ + + def __init__(self, name, connection=None): + self.name = name + self.key = 'rq:wip:%s' % name + self.connection = resolve_connection(connection) + + def add(self, job, timeout): + """Adds a job to WorkingQueue with expiry time of now + timeout.""" + return self.connection._zadd(self.key, current_timestamp() + timeout, + job.id) + + def remove(self, job): + return self.connection.zrem(self.key, job.id) + + def get_expired_job_ids(self): + """Returns job ids whose score are less than current timestamp.""" + return self.connection.zrangebyscore(self.key, 0, current_timestamp()) + + def get_job_ids(self, start=0, end=-1): + """Returns list of all job ids.""" + return self.connection.zrange(self.key, start, end) + diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py new file mode 100644 index 0000000..9c1a1e5 --- /dev/null +++ b/tests/test_working_queue.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from rq.job import Job +from rq.utils import current_timestamp +from rq.working_queue import WorkingQueue + +from tests import RQTestCase + + +class TestQueue(RQTestCase): + + def setUp(self): + super(TestQueue, self).setUp() + self.working_queue = WorkingQueue('default', connection=self.testconn) + + def test_add_and_remove(self): + """Adding and removing job to WorkingQueue.""" + timestamp = current_timestamp() + job = Job() + + # Test that job is added with the right score + self.working_queue.add(job, 1000) + self.assertLess(self.testconn.zscore(self.working_queue.key, job.id), + timestamp + 1001) + + # Ensure that job is properly removed from sorted set + self.working_queue.remove(job) + self.assertIsNone(self.testconn.zscore(self.working_queue.key, job.id)) + + def test_get_job_ids(self): + """Getting job ids from WorkingQueue.""" + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.testconn.zadd(self.working_queue.key, 10, 'bar') + self.assertEqual(self.working_queue.get_job_ids(), ['foo', 'bar']) + + def test_get_expired_job_ids(self): + """Getting expired job ids form WorkingQueue.""" + timestamp = current_timestamp() + + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') + + self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) \ No newline at end of file