Merge pull request #434 from jtushman/redis-based-shutdown

triggering shutdown by setting a redis flag
main
Selwin Ong 10 years ago
commit 31dcb572a7

3
.gitignore vendored

@ -10,6 +10,3 @@
.tox
.vagrant
Vagrantfile
# PyCharm
.idea

@ -1,2 +1,2 @@
redis
click
redis==2.7.0
click>=3.0.0

@ -16,6 +16,7 @@ from rq import Connection, get_failed_queue, Queue
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended
from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
show_both, show_queues, show_workers)
@ -24,8 +25,12 @@ from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.')
config_option = click.option('--config', '-c', help='Module containing RQ settings.')
def connect(url):
def connect(url, config=None):
settings = read_config_file(config) if config else {}
url = url or settings.get('REDIS_URL')
return StrictRedis.from_url(url or 'redis://localhost:6379/0')
@ -120,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
@main.command()
@url_option
@click.option('--config', '-c', help='Module containing RQ settings.')
@config_option
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@ -158,7 +163,12 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)
if is_suspended(conn):
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
sys.exit(1)
try:
queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues,
name=name,
@ -178,3 +188,34 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
except ConnectionError as e:
print(e)
sys.exit(1)
@main.command()
@url_option
@config_option
@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
def suspend(url, config, duration):
"""Suspends all workers, to resume run `rq resume`"""
if duration is not None and duration < 1:
click.echo("Duration must be an integer greater than 1")
sys.exit(1)
connection = connect(url, config)
connection_suspend(connection, duration)
if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
automatically resume""".format(duration)
click.echo(msg)
else:
click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed")
@main.command()
@url_option
@config_option
def resume(url, config):
"""Resumes processing of queues, that where suspended with `rq suspend`"""
connection = connect(url, config)
connection_resume(connection)
click.echo("Resuming workers.")

@ -8,7 +8,9 @@ from functools import partial
import click
from rq import Queue, Worker
from rq.worker import WorkerStatus
from rq.logutils import setup_loghandlers
from rq.suspension import is_suspended
red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
@ -39,8 +41,9 @@ def get_scale(x):
def state_symbol(state):
symbols = {
'busy': red('busy'),
'idle': green('idle'),
WorkerStatus.BUSY: red('busy'),
WorkerStatus.IDLE: green('idle'),
WorkerStatus.SUSPENDED: yellow('suspended'),
}
try:
return symbols[state]

@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack
from .utils import import_attribute, utcformat, utcnow, utcparse
from .utils import import_attribute, utcformat, utcnow, utcparse, enum
try:
import cPickle as pickle
@ -25,16 +25,7 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads
def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
# NOTE: Yes, we *really* want to cast using str() here.
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)
Status = enum('Status',
JobStatus = enum('JobStatus',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')
@ -167,19 +158,19 @@ class Job(object):
@property
def is_finished(self):
return self.get_status() == Status.FINISHED
return self.get_status() == JobStatus.FINISHED
@property
def is_queued(self):
return self.get_status() == Status.QUEUED
return self.get_status() == JobStatus.QUEUED
@property
def is_failed(self):
return self.get_status() == Status.FAILED
return self.get_status() == JobStatus.FAILED
@property
def is_started(self):
return self.get_status() == Status.STARTED
return self.get_status() == JobStatus.STARTED
@property
def dependency(self):

@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function,
import uuid
from .connections import resolve_connection
from .job import Job, Status
from .job import Job, JobStatus
from .utils import import_attribute, utcnow
from .exceptions import (DequeueTimeout, InvalidJobOperationError,
@ -180,7 +180,7 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, status=Status.QUEUED,
result_ttl=result_ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on, timeout=timeout,
id=job_id)
@ -195,7 +195,7 @@ class Queue(object):
while True:
try:
pipe.watch(depends_on.key)
if depends_on.get_status() != Status.FINISHED:
if depends_on.get_status() != JobStatus.FINISHED:
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
pipe.execute()
@ -391,7 +391,7 @@ class Queue(object):
class FailedQueue(Queue):
def __init__(self, connection=None):
super(FailedQueue, self).__init__(Status.FAILED, connection=connection)
super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)
def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed
@ -418,7 +418,7 @@ class FailedQueue(Queue):
if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
job.set_status(Status.QUEUED)
job.set_status(JobStatus.QUEUED)
job.exc_info = None
q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job)

@ -0,0 +1,18 @@
WORKERS_SUSPENDED = 'rq:suspended'
def is_suspended(connection):
return connection.exists(WORKERS_SUSPENDED)
def suspend(connection, ttl=None):
"""ttl = time to live in seconds. Default is no expiration
Note: If you pass in 0 it will invalidate right away
"""
connection.set(WORKERS_SUSPENDED, 1)
if ttl is not None:
connection.expire(WORKERS_SUSPENDED, ttl)
def resume(connection):
return connection.delete(WORKERS_SUSPENDED)

@ -208,3 +208,13 @@ def first(iterable, default=None, key=None):
def current_timestamp():
"""Returns current UTC timestamp"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple())
def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
# NOTE: Yes, we *really* want to cast using str() here.
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)

@ -12,18 +12,20 @@ import sys
import time
import traceback
import warnings
from datetime import datetime
from rq.compat import as_text, string_types, text_type
from .connections import get_current_connection
from .exceptions import DequeueTimeout, NoQueueError
from .job import Job, Status
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue
from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow
from .utils import import_attribute, make_colorizer, utcformat, utcnow, enum
from .version import VERSION
from .registry import FinishedJobRegistry, StartedJobRegistry
from .suspension import is_suspended
try:
from procname import setprocname
@ -65,6 +67,12 @@ def signal_name(signum):
return 'SIG_UNKNOWN'
WorkerStatus = enum('WorkerStatus',
STARTED='started', SUSPENDED='suspended', BUSY='busy',
IDLE='idle'
)
class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = 'rq:workers'
@ -333,6 +341,30 @@ class Worker(object):
signal.signal(signal.SIGINT, request_stop)
signal.signal(signal.SIGTERM, request_stop)
def check_for_suspension(self, burst):
"""Check to see if the workers have been suspended by something like `rq suspend`"""
before_state = None
notified = False
while not self.stopped and is_suspended(self.connection):
if burst:
self.log.info('Suspended in burst mode -- exiting.')
self.log.info('Note: There could still be unperformed jobs on the queue')
raise StopRequested
if not notified:
self.log.info('Worker suspended, use "rq resume" command to resume')
before_state = self.get_state()
self.set_state(WorkerStatus.SUSPENDED)
notified = True
time.sleep(1)
if before_state:
self.set_state(before_state)
def work(self, burst=False):
"""Starts the work loop.
@ -348,15 +380,19 @@ class Worker(object):
did_perform_work = False
self.register_birth()
self.log.info('RQ worker started, version %s' % VERSION)
self.set_state('starting')
self.set_state(WorkerStatus.STARTED)
try:
while True:
try:
self.check_for_suspension(burst)
if self.stopped:
self.log.info('Stopping on request.')
break
timeout = None if burst else max(1, self.default_worker_ttl - 60)
try:
result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
break
@ -367,20 +403,22 @@ class Worker(object):
self.execute_job(job)
self.heartbeat()
if job.get_status() == Status.FINISHED:
if job.get_status() == JobStatus.FINISHED:
queue.enqueue_dependents(job)
did_perform_work = True
finally:
if not self.is_horse:
self.register_death()
return did_perform_work
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
qnames = self.queue_names()
self.set_state('idle')
self.set_state(WorkerStatus.IDLE)
self.procline('Listening on %s' % ','.join(qnames))
self.log.info('')
self.log.info('*** Listening on %s...' %
@ -477,12 +515,12 @@ class Worker(object):
timeout = (job.timeout or 180) + 60
with self.connection._pipeline() as pipeline:
self.set_state('busy', pipeline=pipeline)
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(timeout, pipeline=pipeline)
registry = StartedJobRegistry(job.origin, self.connection)
registry.add(job, timeout, pipeline=pipeline)
job.set_status(Status.STARTED, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline)
pipeline.execute()
self.procline('Processing %s from %s since %s' % (
@ -511,7 +549,7 @@ class Worker(object):
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.ended_at = utcnow()
job._status = Status.FINISHED
job._status = JobStatus.FINISHED
job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin, self.connection)
@ -523,7 +561,7 @@ class Worker(object):
pipeline.execute()
except Exception:
job.set_status(Status.FAILED, pipeline=pipeline)
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
self.handle_exception(job, *sys.exc_info())

@ -26,6 +26,17 @@ class TestCommandLine(TestCase):
class TestRQCli(RQTestCase):
def assert_normal_execution(self, result):
if result.exit_code == 0:
return True
else:
print("Non normal execution")
print("Exit Code: {}".format(result.exit_code))
print("Output: {}".format(result.output))
print("Exception: {}".format(result.exception))
self.assertEqual(result.exit_code, 0)
"""Test rq_cli script"""
def setUp(self):
super(TestRQCli, self).setUp()
@ -41,25 +52,58 @@ class TestRQCli(RQTestCase):
"""rq empty -u <url> failed"""
runner = CliRunner()
result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed'])
self.assertEqual(result.exit_code, 0)
self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
def test_requeue(self):
"""rq requeue -u <url> --all"""
runner = CliRunner()
result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
self.assertEqual(result.exit_code, 0)
self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue')
def test_info(self):
"""rq info -u <url>"""
runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url])
self.assertEqual(result.exit_code, 0)
self.assert_normal_execution(result)
self.assertIn('1 queues, 1 jobs total', result.output)
def test_worker(self):
"""rq worker -u <url> -b"""
runner = CliRunner()
result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assertEqual(result.exit_code, 0)
self.assert_normal_execution(result)
def test_suspend_and_resume(self):
"""rq suspend -u <url>
rq resume -u <url>
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url])
self.assert_normal_execution(result)
result = runner.invoke(main, ['resume', '-u', self.redis_url])
self.assert_normal_execution(result)
def test_suspend_with_ttl(self):
"""rq suspend -u <url> --duration=2
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1])
self.assert_normal_execution(result)
def test_suspend_with_invalid_ttl(self):
"""rq suspend -u <url> --duration=0
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0])
self.assertEqual(result.exit_code, 1)
self.assertIn("Duration must be an integer greater than 1", result.output)

@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
from rq.job import Job, Status
from rq.job import Job, JobStatus
from rq.worker import Worker
from tests import RQTestCase
@ -262,7 +262,7 @@ class TestQueue(RQTestCase):
"""Enqueueing a job sets its status to "queued"."""
q = Queue()
job = q.enqueue(say_hello)
self.assertEqual(job.get_status(), Status.QUEUED)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_explicit_args(self):
"""enqueue() works for both implicit/explicit args."""
@ -346,7 +346,7 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(Status.FINISHED)
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id])
@ -361,7 +361,7 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(Status.FINISHED)
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [job.id])
@ -377,7 +377,7 @@ class TestQueue(RQTestCase):
self.assertEqual(job.timeout, 123)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(Status.FINISHED)
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, [job.id])
@ -439,7 +439,7 @@ class TestFailedQueue(RQTestCase):
get_failed_queue().requeue(job.id)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.QUEUED)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_preserves_result_ttl(self):
"""Enqueueing persists result_ttl."""

@ -3,15 +3,17 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import os
from time import sleep
from rq import get_failed_queue, Queue, Worker, SimpleWorker
from rq.compat import as_text
from rq.job import Job, Status
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import suspend, resume
from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, say_hello, say_pid)
div_by_zero, say_hello, say_pid, do_nothing)
from tests.helpers import strip_microseconds
@ -222,14 +224,14 @@ class TestWorker(RQTestCase):
w = Worker([q])
job = q.enqueue(say_hello)
self.assertEqual(job.get_status(), Status.QUEUED)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(job.is_queued, True)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, False)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.FINISHED)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, True)
self.assertEqual(job.is_failed, False)
@ -238,7 +240,7 @@ class TestWorker(RQTestCase):
job = q.enqueue(div_by_zero, args=(1,))
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.FAILED)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, True)
@ -251,13 +253,13 @@ class TestWorker(RQTestCase):
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), Status.FINISHED)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
parent_job = q.enqueue(div_by_zero)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertNotEqual(job.get_status(), Status.FINISHED)
self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
def test_get_current_job(self):
"""Ensure worker.get_current_job() works properly"""
@ -319,6 +321,56 @@ class TestWorker(RQTestCase):
self.assertEquals(job.result, 'Hi there, Adam!')
self.assertEquals(job.description, '你好 世界!')
def test_suspend_worker_execution(self):
"""Test Pause Worker Execution"""
SENTINEL_FILE = '/tmp/rq-tests.txt'
try:
# Remove the sentinel if it is leftover from a previous test run
os.remove(SENTINEL_FILE)
except OSError as e:
if e.errno != 2:
raise
q = Queue()
job = q.enqueue(create_file, SENTINEL_FILE)
w = Worker([q])
suspend(self.testconn)
w.work(burst=True)
assert q.count == 1
# Should not have created evidence of execution
self.assertEquals(os.path.exists(SENTINEL_FILE), False)
resume(self.testconn)
w.work(burst=True)
assert q.count == 0
self.assertEquals(os.path.exists(SENTINEL_FILE), True)
def test_suspend_with_duration(self):
q = Queue()
for _ in range(5):
q.enqueue(do_nothing)
w = Worker([q])
# This suspends workers for working for 2 second
suspend(self.testconn, 2)
# So when this burst of work happens the queue should remain at 5
w.work(burst=True)
assert q.count == 5
sleep(3)
# The suspension should be expired now, and a burst of work should now clear the queue
w.work(burst=True)
assert q.count == 0
def test_worker_hash_(self):
"""Workers are hashed by their .name attribute"""
q = Queue('foo')

Loading…
Cancel
Save