Merge remote-tracking branch 'upstream/master'

main
Michael Keirnan 9 years ago
commit 14723ecc1f

@ -15,6 +15,6 @@ install:
- pip install coveralls --use-mirrors - pip install coveralls --use-mirrors
#- pip install pytest # installed by Travis by default already #- pip install pytest # installed by Travis by default already
script: script:
- py.test --cov rq - RUN_SLOW_TESTS_TOO=1 py.test --cov rq
after_success: after_success:
- coveralls - coveralls

@ -1,3 +1,10 @@
### 0.5.6
- Job results are now logged on `DEBUG` level. Thanks @tbaugis!
- Modified `patch_connection` so Redis connection can be easily mocked
- Customer exception handlers are now called if Redis connection is lost. Thanks @jlopex!
- Jobs can now depend on jobs in a different queue. Thanks @jlopex!
### 0.5.5 ### 0.5.5
(August 25th, 2015) (August 25th, 2015)

@ -103,6 +103,7 @@ def requeue(url, all, job_ids):
@main.command() @main.command()
@url_option @url_option
@config_option
@click.option('--path', '-P', default='.', help='Specify the import path.') @click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)') @click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)')
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') @click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts')
@ -110,7 +111,7 @@ def requeue(url, all, job_ids):
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') @click.option('--only-workers', '-W', is_flag=True, help='Show only worker info')
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') @click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues): def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, queues):
"""RQ command-line monitor.""" """RQ command-line monitor."""
if path: if path:
@ -124,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
func = show_both func = show_both
try: try:
with Connection(connect(url)): with Connection(connect(url, config)):
refresh(interval, func, queues, raw, by_queue) refresh(interval, func, queues, raw, by_queue)
except ConnectionError as e: except ConnectionError as e:
click.echo(e) click.echo(e)

@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
VERSION = '0.5.5' VERSION = '0.5.6'

@ -323,47 +323,47 @@ class Worker(object):
gracefully. gracefully.
""" """
def request_force_stop(signum, frame): signal.signal(signal.SIGINT, self.request_stop)
"""Terminates the application (cold shutdown). signal.signal(signal.SIGTERM, self.request_stop)
"""
self.log.warning('Cold shut down')
# Take down the horse with the worker
if self.horse_pid:
msg = 'Taking down horse {0} with me'.format(self.horse_pid)
self.log.debug(msg)
try:
os.kill(self.horse_pid, signal.SIGKILL)
except OSError as e:
# ESRCH ("No such process") is fine with us
if e.errno != errno.ESRCH:
self.log.debug('Horse already down')
raise
raise SystemExit()
def request_stop(signum, frame): def request_force_stop(self, signum, frame):
"""Stops the current worker loop but waits for child processes to """Terminates the application (cold shutdown).
end gracefully (warm shutdown). """
""" self.log.warning('Cold shut down')
self.log.debug('Got signal {0}'.format(signal_name(signum)))
signal.signal(signal.SIGINT, request_force_stop) # Take down the horse with the worker
signal.signal(signal.SIGTERM, request_force_stop) if self.horse_pid:
msg = 'Taking down horse {0} with me'.format(self.horse_pid)
self.log.debug(msg)
try:
os.kill(self.horse_pid, signal.SIGKILL)
except OSError as e:
# ESRCH ("No such process") is fine with us
if e.errno != errno.ESRCH:
self.log.debug('Horse already down')
raise
raise SystemExit()
def request_stop(self, signum, frame):
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
"""
self.log.debug('Got signal {0}'.format(signal_name(signum)))
msg = 'Warm shut down requested' signal.signal(signal.SIGINT, self.request_force_stop)
self.log.warning(msg) signal.signal(signal.SIGTERM, self.request_force_stop)
# If shutdown is requested in the middle of a job, wait until msg = 'Warm shut down requested'
# finish before shutting down self.log.warning(msg)
if self.get_state() == 'busy':
self._stop_requested = True
self.log.debug('Stopping after current horse is finished. '
'Press Ctrl+C again for a cold shutdown.')
else:
raise StopRequested()
signal.signal(signal.SIGINT, request_stop) # If shutdown is requested in the middle of a job, wait until
signal.signal(signal.SIGTERM, request_stop) # finish before shutting down
if self.get_state() == 'busy':
self._stop_requested = True
self.log.debug('Stopping after current horse is finished. '
'Press Ctrl+C again for a cold shutdown.')
else:
raise StopRequested()
def check_for_suspension(self, burst): def check_for_suspension(self, burst):
"""Check to see if workers have been suspended by `rq suspend`""" """Check to see if workers have been suspended by `rq suspend`"""
@ -491,9 +491,10 @@ class Worker(object):
within the given timeout bounds, or will end the work horse with within the given timeout bounds, or will end the work horse with
SIGALRM. SIGALRM.
""" """
self.set_state('busy')
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id os.environ['RQ_JOB_ID'] = job.id
child_pid = os.fork()
if child_pid == 0: if child_pid == 0:
self.main_work_horse(job) self.main_work_horse(job)
else: else:
@ -501,7 +502,6 @@ class Worker(object):
self.procline('Forked {0} at {0}'.format(child_pid, time.time())) self.procline('Forked {0} at {0}'.format(child_pid, time.time()))
while True: while True:
try: try:
self.set_state('busy')
os.waitpid(child_pid, 0) os.waitpid(child_pid, 0)
self.set_state('idle') self.set_state('idle')
break break
@ -601,10 +601,10 @@ class Worker(object):
self.handle_exception(job, *sys.exc_info()) self.handle_exception(job, *sys.exc_info())
return False return False
if rv is None: self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id))
self.log.info('Job OK') if rv:
else: log_result = "{0!r}".format(as_text(text_type(rv)))
self.log.info('Job OK, result = {0!r}'.format(yellow(text_type(rv)))) self.log.debug('Result: {0}'.format(yellow(log_result)))
if result_ttl == 0: if result_ttl == 0:
self.log.info('Result discarded immediately') self.log.info('Result discarded immediately')
@ -665,6 +665,7 @@ class Worker(object):
def clean_registries(self): def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries.""" """Runs maintenance jobs on each Queue's registries."""
for queue in self.queues: for queue in self.queues:
self.log.info('Cleaning registries for queue: {0}'.format(queue.name))
clean_registries(queue) clean_registries(queue)
self.last_cleaned_at = utcnow() self.last_cleaned_at = utcnow()

@ -17,9 +17,9 @@ else
safe_rg=cat safe_rg=cat
fi fi
export ONLY_RUN_FAST_TESTS=1 export RUN_SLOW_TESTS_TOO=1
if [ "$1" = '-f' ]; then # Poor man's argparse if [ "$1" = '-f' ]; then # Poor man's argparse
unset ONLY_RUN_FAST_TESTS unset RUN_SLOW_TESTS_TOO
shift 1 shift 1
fi fi

@ -32,7 +32,7 @@ def slow(f):
@wraps(f) @wraps(f)
def _inner(*args, **kwargs): def _inner(*args, **kwargs):
if os.environ.get('ONLY_RUN_FAST_TESTS'): if os.environ.get('RUN_SLOW_TESTS_TOO'):
f(*args, **kwargs) f(*args, **kwargs)
return _inner return _inner

@ -5,6 +5,9 @@ from __future__ import (absolute_import, division, print_function,
import os import os
from datetime import timedelta from datetime import timedelta
from time import sleep from time import sleep
import signal
import time
from multiprocessing import Process
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, from tests.fixtures import (create_file, create_file_after_timeout,
@ -468,3 +471,74 @@ class TestWorker(RQTestCase):
worker = Worker(queue, connection=self.testconn) worker = Worker(queue, connection=self.testconn)
worker.work(burst=True) worker.work(burst=True)
self.assertEqual(self.testconn.zcard(registry.key), 0) self.assertEqual(self.testconn.zcard(registry.key), 0)
def kill_worker(pid, double_kill):
# wait for the worker to be started over on the main process
time.sleep(0.5)
os.kill(pid, signal.SIGTERM)
if double_kill:
# give the worker time to switch signal handler
time.sleep(0.5)
os.kill(pid, signal.SIGTERM)
class TestWorkerShutdown(RQTestCase):
def setUp(self):
# we want tests to fail if signal are ignored and the work remain running,
# so set a signal to kill them after 5 seconds
signal.signal(signal.SIGALRM, self._timeout)
signal.alarm(5)
def _timeout(self, signal, frame):
raise AssertionError("test still running after 5 seconds, "
"likely the worker wasn't shutdown correctly")
@slow
def test_idle_worker_warm_shutdown(self):
"""worker with no ongoing job receiving single SIGTERM signal and shutting down"""
w = Worker('foo')
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), False))
p.start()
w.work()
p.join(1)
self.assertFalse(w._stop_requested)
@slow
def test_working_worker_warm_shutdown(self):
"""worker with an ongoing job receiving single SIGTERM signal, allowing job to finish then shutting down"""
fooq = Queue('foo')
w = Worker(fooq)
sentinel_file = '/tmp/.rq_sentinel_warm'
fooq.enqueue(create_file_after_timeout, sentinel_file, 2)
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), False))
p.start()
w.work()
p.join(2)
self.assertTrue(w._stop_requested)
self.assertTrue(os.path.exists(sentinel_file))
@slow
def test_working_worker_cold_shutdown(self):
"""worker with an ongoing job receiving double SIGTERM signal and shutting down immediately"""
fooq = Queue('foo')
w = Worker(fooq)
sentinel_file = '/tmp/.rq_sentinel_cold'
fooq.enqueue(create_file_after_timeout, sentinel_file, 2)
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), True))
p.start()
self.assertRaises(SystemExit, w.work)
p.join(1)
self.assertTrue(w._stop_requested)
self.assertFalse(os.path.exists(sentinel_file))

@ -1,5 +1,5 @@
[tox] [tox]
envlist=py26,py27,py33,py34,pypy,flake8 envlist=py26,py27,py33,py34,py35,pypy,flake8
[testenv] [testenv]
commands=py.test --cov rq {posargs} commands=py.test --cov rq {posargs}

Loading…
Cancel
Save