Merge branch 'master' of github.com:nvie/rq into cli-rq-worker

main
zhangliyong 10 years ago
commit 3d49784bcc

@ -68,6 +68,9 @@ class Queue(object):
job_class = import_attribute(job_class)
self.job_class = job_class
def __len__(self):
return self.count
@property
def key(self):
"""Returns the Redis key for this Queue."""

@ -4,23 +4,29 @@ from .queue import FailedQueue
from .utils import current_timestamp
class StartedJobRegistry:
class BaseRegistry(object):
"""
Registry of currently executing jobs. Each queue maintains a StartedJobRegistry.
StartedJobRegistry contains job keys that are currently being executed.
Each key is scored by job's expiration time (datetime started + timeout).
Base implementation of job registry, implemented in Redis sorted set. Each job
is stored as a key in the registry, scored by expiration time (unix timestamp).
Jobs are added to registry right before they are executed and removed
right after completion (success or failure).
Jobs whose score are lower than current time is considered "expired".
Jobs with scores are lower than current time is considered "expired" and
should be cleaned up.
"""
def __init__(self, name='default', connection=None):
self.name = name
self.key = 'rq:wip:%s' % name
self.connection = resolve_connection(connection)
def __len__(self):
"""Returns the number of jobs in this registry"""
return self.count
@property
def count(self):
"""Returns the number of jobs in this registry"""
self.cleanup()
return self.connection.zcard(self.key)
def add(self, job, timeout, pipeline=None):
"""Adds a job to StartedJobRegistry with expiry time of now + timeout."""
score = current_timestamp() + timeout
@ -40,11 +46,28 @@ class StartedJobRegistry:
def get_job_ids(self, start=0, end=-1):
"""Returns list of all job ids."""
self.move_expired_jobs_to_failed_queue()
self.cleanup()
return [as_text(job_id) for job_id in
self.connection.zrange(self.key, start, end)]
def move_expired_jobs_to_failed_queue(self):
class StartedJobRegistry(BaseRegistry):
"""
Registry of currently executing jobs. Each queue maintains a
StartedJobRegistry. Jobs in this registry are ones that are currently
being executed.
Jobs are added to registry right before they are executed and removed
right after completion (success or failure).
Jobs whose score are lower than current time is considered "expired".
"""
def __init__(self, name='default', connection=None):
super(StartedJobRegistry, self).__init__(name, connection)
self.key = 'rq:wip:%s' % name
def cleanup(self):
"""Remove expired jobs from registry and add them to FailedQueue."""
job_ids = self.get_expired_job_ids()
@ -53,6 +76,22 @@ class StartedJobRegistry:
with self.connection.pipeline() as pipeline:
for job_id in job_ids:
failed_queue.push_job_id(job_id, pipeline=pipeline)
pipeline.zremrangebyscore(self.key, 0, current_timestamp())
pipeline.execute()
return job_ids
class FinishedJobRegistry(BaseRegistry):
"""
Registry of jobs that have been completed. Jobs are added to this
registry after they have successfully completed for monitoring purposes.
"""
def __init__(self, name='default', connection=None):
super(FinishedJobRegistry, self).__init__(name, connection)
self.key = 'rq:finished:%s' % name
def cleanup(self):
"""Remove expired jobs from registry."""
self.connection.zremrangebyscore(self.key, 0, current_timestamp())

@ -23,7 +23,7 @@ from .queue import get_failed_queue, Queue
from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow
from .version import VERSION
from .registry import StartedJobRegistry
from .registry import FinishedJobRegistry, StartedJobRegistry
try:
from procname import setprocname
@ -496,7 +496,7 @@ class Worker(object):
self.prepare_job_execution(job)
with self.connection._pipeline() as pipeline:
registry = StartedJobRegistry(job.origin, self.connection)
started_job_registry = StartedJobRegistry(job.origin)
try:
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
@ -513,14 +513,18 @@ class Worker(object):
job.ended_at = utcnow()
job._status = Status.FINISHED
job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin)
finished_job_registry.add(job, result_ttl, pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
registry.remove(job, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
except Exception:
job.set_status(Status.FAILED, pipeline=pipeline)
registry.remove(job, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
self.handle_exception(job, *sys.exc_info())
return False

@ -101,11 +101,13 @@ class TestQueue(RQTestCase):
q.enqueue(say_hello, 'Charlie')
self.testconn.lpush(q.key, '1', '2')
self.assertEquals(q.count, 4)
self.assertEqual(q.count, 4)
self.assertEqual(len(q), 4)
q.compact()
self.assertEquals(q.count, 2)
self.assertEqual(q.count, 2)
self.assertEqual(len(q), 2)
def test_enqueue(self):
"""Enqueueing job onto queues."""

@ -5,16 +5,16 @@ from rq.job import Job
from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.registry import StartedJobRegistry
from rq.registry import FinishedJobRegistry, StartedJobRegistry
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
class TestQueue(RQTestCase):
class TestRegistry(RQTestCase):
def setUp(self):
super(TestQueue, self).setUp()
super(TestRegistry, self).setUp()
self.registry = StartedJobRegistry(connection=self.testconn)
def test_add_and_remove(self):
@ -33,8 +33,9 @@ class TestQueue(RQTestCase):
def test_get_job_ids(self):
"""Getting job ids from StartedJobRegistry."""
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, 10, 'bar')
timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, timestamp + 10, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 20, 'bar')
self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar'])
def test_get_expired_job_ids(self):
@ -51,8 +52,9 @@ class TestQueue(RQTestCase):
failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.registry.key, 1, 'foo')
self.registry.move_expired_jobs_to_failed_queue()
self.registry.cleanup()
self.assertIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None)
def test_job_execution(self):
"""Job is removed from StartedJobRegistry after execution."""
@ -76,3 +78,43 @@ class TestQueue(RQTestCase):
worker.perform_job(job)
self.assertNotIn(job.id, registry.get_job_ids())
def test_get_job_count(self):
"""StartedJobRegistry returns the right number of job count."""
timestamp = current_timestamp() + 10
self.testconn.zadd(self.registry.key, timestamp, 'foo')
self.testconn.zadd(self.registry.key, timestamp, 'bar')
self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2)
class TestFinishedJobRegistry(RQTestCase):
def setUp(self):
super(TestFinishedJobRegistry, self).setUp()
self.registry = FinishedJobRegistry(connection=self.testconn)
def test_cleanup(self):
"""Finished job registry removes expired jobs."""
timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar'])
def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry."""
self.assertEqual(self.registry.get_job_ids(), [])
queue = Queue(connection=self.testconn)
worker = Worker([queue])
# Completed jobs are put in FinishedJobRegistry
job = queue.enqueue(say_hello)
worker.perform_job(job)
self.assertEqual(self.registry.get_job_ids(), [job.id])
# Failed jobs are not put in FinishedJobRegistry
failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job)
self.assertEqual(self.registry.get_job_ids(), [job.id])
Loading…
Cancel
Save