diff --git a/.travis.yml b/.travis.yml index 0a9a69e..2d1529d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,6 +15,6 @@ install: - pip install coveralls --use-mirrors #- pip install pytest # installed by Travis by default already script: - - py.test --cov rq + - RUN_SLOW_TESTS_TOO=1 py.test --cov rq after_success: - coveralls diff --git a/CHANGES.md b/CHANGES.md index 07ba340..74614ff 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,10 @@ +### 0.5.6 + +- Job results are now logged on `DEBUG` level. Thanks @tbaugis! +- Modified `patch_connection` so Redis connection can be easily mocked +- Customer exception handlers are now called if Redis connection is lost. Thanks @jlopex! +- Jobs can now depend on jobs in a different queue. Thanks @jlopex! + ### 0.5.5 (August 25th, 2015) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 8bb79de..e21e68a 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -103,6 +103,7 @@ def requeue(url, all, job_ids): @main.command() @url_option +@config_option @click.option('--path', '-P', default='.', help='Specify the import path.') @click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)') @click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') @@ -110,7 +111,7 @@ def requeue(url, all, job_ids): @click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') @click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') @click.argument('queues', nargs=-1) -def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues): +def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, queues): """RQ command-line monitor.""" if path: @@ -124,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues): func = show_both try: - with Connection(connect(url)): + with Connection(connect(url, config)): refresh(interval, func, queues, raw, by_queue) except ConnectionError as e: click.echo(e) diff --git a/rq/version.py b/rq/version.py index d1ec6a4..3798fab 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '0.5.5' +VERSION = '0.5.6' diff --git a/rq/worker.py b/rq/worker.py index f1cebf9..d701109 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -323,47 +323,47 @@ class Worker(object): gracefully. """ - def request_force_stop(signum, frame): - """Terminates the application (cold shutdown). - """ - self.log.warning('Cold shut down') - - # Take down the horse with the worker - if self.horse_pid: - msg = 'Taking down horse {0} with me'.format(self.horse_pid) - self.log.debug(msg) - try: - os.kill(self.horse_pid, signal.SIGKILL) - except OSError as e: - # ESRCH ("No such process") is fine with us - if e.errno != errno.ESRCH: - self.log.debug('Horse already down') - raise - raise SystemExit() + signal.signal(signal.SIGINT, self.request_stop) + signal.signal(signal.SIGTERM, self.request_stop) - def request_stop(signum, frame): - """Stops the current worker loop but waits for child processes to - end gracefully (warm shutdown). - """ - self.log.debug('Got signal {0}'.format(signal_name(signum))) + def request_force_stop(self, signum, frame): + """Terminates the application (cold shutdown). + """ + self.log.warning('Cold shut down') - signal.signal(signal.SIGINT, request_force_stop) - signal.signal(signal.SIGTERM, request_force_stop) + # Take down the horse with the worker + if self.horse_pid: + msg = 'Taking down horse {0} with me'.format(self.horse_pid) + self.log.debug(msg) + try: + os.kill(self.horse_pid, signal.SIGKILL) + except OSError as e: + # ESRCH ("No such process") is fine with us + if e.errno != errno.ESRCH: + self.log.debug('Horse already down') + raise + raise SystemExit() + + def request_stop(self, signum, frame): + """Stops the current worker loop but waits for child processes to + end gracefully (warm shutdown). + """ + self.log.debug('Got signal {0}'.format(signal_name(signum))) - msg = 'Warm shut down requested' - self.log.warning(msg) + signal.signal(signal.SIGINT, self.request_force_stop) + signal.signal(signal.SIGTERM, self.request_force_stop) - # If shutdown is requested in the middle of a job, wait until - # finish before shutting down - if self.get_state() == 'busy': - self._stop_requested = True - self.log.debug('Stopping after current horse is finished. ' - 'Press Ctrl+C again for a cold shutdown.') - else: - raise StopRequested() + msg = 'Warm shut down requested' + self.log.warning(msg) - signal.signal(signal.SIGINT, request_stop) - signal.signal(signal.SIGTERM, request_stop) + # If shutdown is requested in the middle of a job, wait until + # finish before shutting down + if self.get_state() == 'busy': + self._stop_requested = True + self.log.debug('Stopping after current horse is finished. ' + 'Press Ctrl+C again for a cold shutdown.') + else: + raise StopRequested() def check_for_suspension(self, burst): """Check to see if workers have been suspended by `rq suspend`""" @@ -491,9 +491,10 @@ class Worker(object): within the given timeout bounds, or will end the work horse with SIGALRM. """ + self.set_state('busy') + child_pid = os.fork() os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id - child_pid = os.fork() if child_pid == 0: self.main_work_horse(job) else: @@ -501,7 +502,6 @@ class Worker(object): self.procline('Forked {0} at {0}'.format(child_pid, time.time())) while True: try: - self.set_state('busy') os.waitpid(child_pid, 0) self.set_state('idle') break @@ -601,10 +601,10 @@ class Worker(object): self.handle_exception(job, *sys.exc_info()) return False - if rv is None: - self.log.info('Job OK') - else: - self.log.info('Job OK, result = {0!r}'.format(yellow(text_type(rv)))) + self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) + if rv: + log_result = "{0!r}".format(as_text(text_type(rv))) + self.log.debug('Result: {0}'.format(yellow(log_result))) if result_ttl == 0: self.log.info('Result discarded immediately') @@ -665,6 +665,7 @@ class Worker(object): def clean_registries(self): """Runs maintenance jobs on each Queue's registries.""" for queue in self.queues: + self.log.info('Cleaning registries for queue: {0}'.format(queue.name)) clean_registries(queue) self.last_cleaned_at = utcnow() diff --git a/run_tests b/run_tests index 50736cc..3b454d1 100755 --- a/run_tests +++ b/run_tests @@ -17,9 +17,9 @@ else safe_rg=cat fi -export ONLY_RUN_FAST_TESTS=1 +export RUN_SLOW_TESTS_TOO=1 if [ "$1" = '-f' ]; then # Poor man's argparse - unset ONLY_RUN_FAST_TESTS + unset RUN_SLOW_TESTS_TOO shift 1 fi diff --git a/tests/__init__.py b/tests/__init__.py index 93f11b3..7371a92 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -32,7 +32,7 @@ def slow(f): @wraps(f) def _inner(*args, **kwargs): - if os.environ.get('ONLY_RUN_FAST_TESTS'): + if os.environ.get('RUN_SLOW_TESTS_TOO'): f(*args, **kwargs) return _inner diff --git a/tests/test_worker.py b/tests/test_worker.py index 40b84b4..4528ae1 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -5,6 +5,9 @@ from __future__ import (absolute_import, division, print_function, import os from datetime import timedelta from time import sleep +import signal +import time +from multiprocessing import Process from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, @@ -468,3 +471,74 @@ class TestWorker(RQTestCase): worker = Worker(queue, connection=self.testconn) worker.work(burst=True) self.assertEqual(self.testconn.zcard(registry.key), 0) + + +def kill_worker(pid, double_kill): + # wait for the worker to be started over on the main process + time.sleep(0.5) + os.kill(pid, signal.SIGTERM) + if double_kill: + # give the worker time to switch signal handler + time.sleep(0.5) + os.kill(pid, signal.SIGTERM) + + +class TestWorkerShutdown(RQTestCase): + def setUp(self): + # we want tests to fail if signal are ignored and the work remain running, + # so set a signal to kill them after 5 seconds + signal.signal(signal.SIGALRM, self._timeout) + signal.alarm(5) + + def _timeout(self, signal, frame): + raise AssertionError("test still running after 5 seconds, " + "likely the worker wasn't shutdown correctly") + + @slow + def test_idle_worker_warm_shutdown(self): + """worker with no ongoing job receiving single SIGTERM signal and shutting down""" + w = Worker('foo') + self.assertFalse(w._stop_requested) + p = Process(target=kill_worker, args=(os.getpid(), False)) + p.start() + + w.work() + + p.join(1) + self.assertFalse(w._stop_requested) + + @slow + def test_working_worker_warm_shutdown(self): + """worker with an ongoing job receiving single SIGTERM signal, allowing job to finish then shutting down""" + fooq = Queue('foo') + w = Worker(fooq) + + sentinel_file = '/tmp/.rq_sentinel_warm' + fooq.enqueue(create_file_after_timeout, sentinel_file, 2) + self.assertFalse(w._stop_requested) + p = Process(target=kill_worker, args=(os.getpid(), False)) + p.start() + + w.work() + + p.join(2) + self.assertTrue(w._stop_requested) + self.assertTrue(os.path.exists(sentinel_file)) + + @slow + def test_working_worker_cold_shutdown(self): + """worker with an ongoing job receiving double SIGTERM signal and shutting down immediately""" + fooq = Queue('foo') + w = Worker(fooq) + sentinel_file = '/tmp/.rq_sentinel_cold' + fooq.enqueue(create_file_after_timeout, sentinel_file, 2) + self.assertFalse(w._stop_requested) + p = Process(target=kill_worker, args=(os.getpid(), True)) + p.start() + + self.assertRaises(SystemExit, w.work) + + p.join(1) + self.assertTrue(w._stop_requested) + self.assertFalse(os.path.exists(sentinel_file)) + diff --git a/tox.ini b/tox.ini index ce49502..08172e4 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist=py26,py27,py33,py34,pypy,flake8 +envlist=py26,py27,py33,py34,py35,pypy,flake8 [testenv] commands=py.test --cov rq {posargs}