From acbcea0c66544227fb2b282e276948ffad5a4e8b Mon Sep 17 00:00:00 2001 From: Arnold Krille Date: Sat, 9 Jan 2016 16:07:47 +0100 Subject: [PATCH 1/3] Add the workers connection to _connection_stack This allows jobs to use get_current_connection() with a resolvable connection. And then these jobs can schedule new jobs for example (my use-case). Or attach information to their job-object. Also pop the pushed connection after running the jobs. This is needed for some tests that check the _connection_stack afterwards;-) And also for use-cases where the workers are used multiple times. fixes nvie/rq#479 --- rq/worker.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 60683d0..831fe03 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -16,7 +16,7 @@ from datetime import timedelta from rq.compat import as_text, string_types, text_type -from .connections import get_current_connection +from .connections import get_current_connection, push_connection, pop_connection from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL from .exceptions import DequeueTimeout from .job import Job, JobStatus @@ -581,6 +581,9 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: + + push_connection(self.connection) + started_job_registry = StartedJobRegistry(job.origin, self.connection) try: @@ -623,6 +626,9 @@ class Worker(object): self.handle_exception(job, *sys.exc_info()) return False + finally: + pop_connection() + self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) if rv is not None: log_result = "{0!r}".format(as_text(text_type(rv))) From df22f127eb2377f1e51d1dc5837479e1faf4539e Mon Sep 17 00:00:00 2001 From: Arnold Krille Date: Sat, 9 Jan 2016 16:43:44 +0100 Subject: [PATCH 2/3] Test the worker in its own subprocess - run with an empty queue - schedule one job (which uses get_current_connection and get_current_job) and run `rqworker` - schedule a job that itself schedules `access_self` and run `rqworker` - Make sure the job didn't fail by assuring the failed queue is still empty afterwards. - Install this package locally when running in travis. This actually unifies the behaviour of tox and travis as tox also builds the package and then installs it into each test environment. - fix flake8 (as run by tox) --- .travis.yml | 2 +- tests/fixtures.py | 3 +- tests/test_job.py | 1 + tests/test_queue.py | 18 ++++++---- tests/test_worker.py | 82 ++++++++++++++++++++++++++++++++++++-------- 5 files changed, 83 insertions(+), 23 deletions(-) diff --git a/.travis.yml b/.travis.yml index ff41e24..eca50b3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ python: - "pypy" install: - if [[ $TRAVIS_PYTHON_VERSION == 2.6 ]]; then pip install -r py26-requirements.txt; fi - - pip install -r requirements.txt + - pip install -e . - pip install pytest-cov - pip install coveralls #- pip install pytest # installed by Travis by default already diff --git a/tests/fixtures.py b/tests/fixtures.py index c1b7783..2d15d88 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -9,7 +9,7 @@ from __future__ import (absolute_import, division, print_function, import os import time -from rq import Connection, get_current_job +from rq import Connection, get_current_job, get_current_connection from rq.decorators import job from rq.compat import PY2 @@ -55,6 +55,7 @@ def create_file_after_timeout(path, timeout): def access_self(): + assert get_current_connection() is not None assert get_current_job() is not None diff --git a/tests/test_job.py b/tests/test_job.py index 413039d..1b2affe 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -302,6 +302,7 @@ class TestJob(RQTestCase): q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts w = Worker([q]) w.work(burst=True) + assert get_failed_queue(self.testconn).count == 0 def test_job_access_within_synchronous_job_function(self): queue = Queue(async=False) diff --git a/tests/test_queue.py b/tests/test_queue.py index 6f91ff9..b62fddc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -241,15 +241,19 @@ class TestQueue(RQTestCase): self.assertEqual(queue, fooq) self.assertEqual(job.func, say_hello) self.assertEqual(job.origin, fooq.name) - self.assertEqual(job.args[0], 'for Foo', - 'Foo should be dequeued first.') + self.assertEqual( + job.args[0], 'for Foo', + 'Foo should be dequeued first.' + ) job, queue = Queue.dequeue_any([fooq, barq], None) self.assertEqual(queue, barq) self.assertEqual(job.func, say_hello) self.assertEqual(job.origin, barq.name) - self.assertEqual(job.args[0], 'for Bar', - 'Bar should be dequeued second.') + self.assertEqual( + job.args[0], 'for Bar', + 'Bar should be dequeued second.' + ) def test_dequeue_any_ignores_nonexisting_jobs(self): """Dequeuing (from any queue) silently ignores non-existing jobs.""" @@ -260,8 +264,10 @@ class TestQueue(RQTestCase): # Dequeue simply ignores the missing job and returns None self.assertEqual(q.count, 1) - self.assertEqual(Queue.dequeue_any([Queue(), Queue('low')], None), # noqa - None) + self.assertEqual( + Queue.dequeue_any([Queue(), Queue('low')], None), # noqa + None + ) self.assertEqual(q.count, 0) def test_enqueue_sets_status(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 2762459..e3177c3 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -8,13 +8,16 @@ from time import sleep import signal import time from multiprocessing import Process +import subprocess from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, - div_by_zero, do_nothing, say_hello, say_pid) + div_by_zero, do_nothing, say_hello, say_pid, + access_self) from tests.helpers import strip_microseconds -from rq import get_failed_queue, Queue, SimpleWorker, Worker +from rq import (get_failed_queue, Queue, SimpleWorker, Worker, + get_current_connection) from rq.compat import as_text, PY2 from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry @@ -82,12 +85,16 @@ class TestWorker(RQTestCase): """Worker processes work, then quits.""" fooq, barq = Queue('foo'), Queue('bar') w = Worker([fooq, barq]) - self.assertEqual(w.work(burst=True), False, - 'Did not expect any work on the queue.') + self.assertEqual( + w.work(burst=True), False, + 'Did not expect any work on the queue.' + ) fooq.enqueue(say_hello, name='Frank') - self.assertEqual(w.work(burst=True), True, - 'Expected at least some work done.') + self.assertEqual( + w.work(burst=True), True, + 'Expected at least some work done.' + ) def test_worker_ttl(self): """Worker ttl.""" @@ -102,8 +109,10 @@ class TestWorker(RQTestCase): q = Queue('foo') w = Worker([q]) job = q.enqueue('tests.fixtures.say_hello', name='Frank') - self.assertEqual(w.work(burst=True), True, - 'Expected at least some work done.') + self.assertEqual( + w.work(burst=True), True, + 'Expected at least some work done.' + ) self.assertEqual(job.result, 'Hi there, Frank!') def test_job_times(self): @@ -116,14 +125,25 @@ class TestWorker(RQTestCase): self.assertIsNotNone(job.enqueued_at) self.assertIsNone(job.started_at) self.assertIsNone(job.ended_at) - self.assertEqual(w.work(burst=True), True, - 'Expected at least some work done.') + self.assertEqual( + w.work(burst=True), True, + 'Expected at least some work done.' + ) self.assertEqual(job.result, 'Hi there, Stranger!') after = utcnow() job.refresh() - self.assertTrue(before <= job.enqueued_at <= after, 'Not %s <= %s <= %s' % (before, job.enqueued_at, after)) - self.assertTrue(before <= job.started_at <= after, 'Not %s <= %s <= %s' % (before, job.started_at, after)) - self.assertTrue(before <= job.ended_at <= after, 'Not %s <= %s <= %s' % (before, job.ended_at, after)) + self.assertTrue( + before <= job.enqueued_at <= after, + 'Not %s <= %s <= %s' % (before, job.enqueued_at, after) + ) + self.assertTrue( + before <= job.started_at <= after, + 'Not %s <= %s <= %s' % (before, job.started_at, after) + ) + self.assertTrue( + before <= job.ended_at <= after, + 'Not %s <= %s <= %s' % (before, job.ended_at, after) + ) def test_work_is_unreadable(self): """Unreadable jobs are put on the failed queue.""" @@ -557,8 +577,8 @@ def kill_worker(pid, double_kill): class TestWorkerShutdown(RQTestCase): def setUp(self): - # we want tests to fail if signal are ignored and the work remain running, - # so set a signal to kill them after 5 seconds + # we want tests to fail if signal are ignored and the work remain + # running, so set a signal to kill them after 5 seconds signal.signal(signal.SIGALRM, self._timeout) signal.alarm(5) @@ -621,3 +641,35 @@ class TestWorkerShutdown(RQTestCase): shutdown_requested_date = w.shutdown_requested_date self.assertIsNotNone(shutdown_requested_date) self.assertEqual(type(shutdown_requested_date).__name__, 'datetime') + + +def schedule_access_self(): + q = Queue('default', connection=get_current_connection()) + q.enqueue(access_self) + + +class TestWorkerSubprocess(RQTestCase): + def setUp(self): + super(TestWorkerSubprocess, self).setUp() + db_num = self.testconn.connection_pool.connection_kwargs['db'] + self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num + + def test_run_empty_queue(self): + """Run the worker in its own process with an empty queue""" + subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) + + def test_run_access_self(self): + """Schedule a job, then run the worker as subprocess""" + q = Queue() + q.enqueue(access_self) + subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) + assert get_failed_queue().count == 0 + assert q.count == 0 + + def test_run_scheduled_access_self(self): + """Schedule a job that schedules a job, then run the worker as subprocess""" + q = Queue() + q.enqueue(schedule_access_self) + subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) + assert get_failed_queue().count == 0 + assert q.count == 0 From 8e99706b16041c8805c70f7e3b67604447d2770e Mon Sep 17 00:00:00 2001 From: Arnold Krille Date: Sat, 27 Feb 2016 17:57:34 +0100 Subject: [PATCH 3/3] run python 3.5 on travis, adopt timeouts Also - Report the five slowest tests to watch for risk of timeout - Double timeouts. Maybe that helps pypy on travis to finish successfully. --- .travis.yml | 2 +- tests/test_worker.py | 10 ++++++---- tox.ini | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index eca50b3..c804c42 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,6 +16,6 @@ install: - pip install coveralls #- pip install pytest # installed by Travis by default already script: - - RUN_SLOW_TESTS_TOO=1 py.test --cov rq + - RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=5 after_success: - coveralls diff --git a/tests/test_worker.py b/tests/test_worker.py index e3177c3..c63f97b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -578,13 +578,15 @@ def kill_worker(pid, double_kill): class TestWorkerShutdown(RQTestCase): def setUp(self): # we want tests to fail if signal are ignored and the work remain - # running, so set a signal to kill them after 5 seconds + # running, so set a signal to kill them after X seconds + self.killtimeout = 10 signal.signal(signal.SIGALRM, self._timeout) - signal.alarm(5) + signal.alarm(self.killtimeout) def _timeout(self, signal, frame): - raise AssertionError("test still running after 5 seconds, " - "likely the worker wasn't shutdown correctly") + raise AssertionError( + "test still running after %i seconds, likely the worker wasn't shutdown correctly" % self.killtimeout + ) @slow def test_idle_worker_warm_shutdown(self): diff --git a/tox.ini b/tox.ini index 28b6aef..93a76b9 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,7 @@ envlist=py26,py27,py33,py34,py35,pypy,flake8 [testenv] -commands=py.test --cov rq {posargs} +commands=py.test --cov rq --durations=5 {posargs} deps= pytest pytest-cov