diff --git a/.travis.yml b/.travis.yml index ff41e24..c804c42 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,11 +11,11 @@ python: - "pypy" install: - if [[ $TRAVIS_PYTHON_VERSION == 2.6 ]]; then pip install -r py26-requirements.txt; fi - - pip install -r requirements.txt + - pip install -e . - pip install pytest-cov - pip install coveralls #- pip install pytest # installed by Travis by default already script: - - RUN_SLOW_TESTS_TOO=1 py.test --cov rq + - RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=5 after_success: - coveralls diff --git a/rq/worker.py b/rq/worker.py index 60683d0..831fe03 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -16,7 +16,7 @@ from datetime import timedelta from rq.compat import as_text, string_types, text_type -from .connections import get_current_connection +from .connections import get_current_connection, push_connection, pop_connection from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL from .exceptions import DequeueTimeout from .job import Job, JobStatus @@ -581,6 +581,9 @@ class Worker(object): self.prepare_job_execution(job) with self.connection._pipeline() as pipeline: + + push_connection(self.connection) + started_job_registry = StartedJobRegistry(job.origin, self.connection) try: @@ -623,6 +626,9 @@ class Worker(object): self.handle_exception(job, *sys.exc_info()) return False + finally: + pop_connection() + self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) if rv is not None: log_result = "{0!r}".format(as_text(text_type(rv))) diff --git a/tests/fixtures.py b/tests/fixtures.py index c1b7783..2d15d88 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -9,7 +9,7 @@ from __future__ import (absolute_import, division, print_function, import os import time -from rq import Connection, get_current_job +from rq import Connection, get_current_job, get_current_connection from rq.decorators import job from rq.compat import PY2 @@ -55,6 +55,7 @@ def create_file_after_timeout(path, timeout): def access_self(): + assert get_current_connection() is not None assert get_current_job() is not None diff --git a/tests/test_job.py b/tests/test_job.py index 413039d..1b2affe 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -302,6 +302,7 @@ class TestJob(RQTestCase): q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts w = Worker([q]) w.work(burst=True) + assert get_failed_queue(self.testconn).count == 0 def test_job_access_within_synchronous_job_function(self): queue = Queue(async=False) diff --git a/tests/test_queue.py b/tests/test_queue.py index 6f91ff9..b62fddc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -241,15 +241,19 @@ class TestQueue(RQTestCase): self.assertEqual(queue, fooq) self.assertEqual(job.func, say_hello) self.assertEqual(job.origin, fooq.name) - self.assertEqual(job.args[0], 'for Foo', - 'Foo should be dequeued first.') + self.assertEqual( + job.args[0], 'for Foo', + 'Foo should be dequeued first.' + ) job, queue = Queue.dequeue_any([fooq, barq], None) self.assertEqual(queue, barq) self.assertEqual(job.func, say_hello) self.assertEqual(job.origin, barq.name) - self.assertEqual(job.args[0], 'for Bar', - 'Bar should be dequeued second.') + self.assertEqual( + job.args[0], 'for Bar', + 'Bar should be dequeued second.' + ) def test_dequeue_any_ignores_nonexisting_jobs(self): """Dequeuing (from any queue) silently ignores non-existing jobs.""" @@ -260,8 +264,10 @@ class TestQueue(RQTestCase): # Dequeue simply ignores the missing job and returns None self.assertEqual(q.count, 1) - self.assertEqual(Queue.dequeue_any([Queue(), Queue('low')], None), # noqa - None) + self.assertEqual( + Queue.dequeue_any([Queue(), Queue('low')], None), # noqa + None + ) self.assertEqual(q.count, 0) def test_enqueue_sets_status(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 2762459..c63f97b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -8,13 +8,16 @@ from time import sleep import signal import time from multiprocessing import Process +import subprocess from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, - div_by_zero, do_nothing, say_hello, say_pid) + div_by_zero, do_nothing, say_hello, say_pid, + access_self) from tests.helpers import strip_microseconds -from rq import get_failed_queue, Queue, SimpleWorker, Worker +from rq import (get_failed_queue, Queue, SimpleWorker, Worker, + get_current_connection) from rq.compat import as_text, PY2 from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry @@ -82,12 +85,16 @@ class TestWorker(RQTestCase): """Worker processes work, then quits.""" fooq, barq = Queue('foo'), Queue('bar') w = Worker([fooq, barq]) - self.assertEqual(w.work(burst=True), False, - 'Did not expect any work on the queue.') + self.assertEqual( + w.work(burst=True), False, + 'Did not expect any work on the queue.' + ) fooq.enqueue(say_hello, name='Frank') - self.assertEqual(w.work(burst=True), True, - 'Expected at least some work done.') + self.assertEqual( + w.work(burst=True), True, + 'Expected at least some work done.' + ) def test_worker_ttl(self): """Worker ttl.""" @@ -102,8 +109,10 @@ class TestWorker(RQTestCase): q = Queue('foo') w = Worker([q]) job = q.enqueue('tests.fixtures.say_hello', name='Frank') - self.assertEqual(w.work(burst=True), True, - 'Expected at least some work done.') + self.assertEqual( + w.work(burst=True), True, + 'Expected at least some work done.' + ) self.assertEqual(job.result, 'Hi there, Frank!') def test_job_times(self): @@ -116,14 +125,25 @@ class TestWorker(RQTestCase): self.assertIsNotNone(job.enqueued_at) self.assertIsNone(job.started_at) self.assertIsNone(job.ended_at) - self.assertEqual(w.work(burst=True), True, - 'Expected at least some work done.') + self.assertEqual( + w.work(burst=True), True, + 'Expected at least some work done.' + ) self.assertEqual(job.result, 'Hi there, Stranger!') after = utcnow() job.refresh() - self.assertTrue(before <= job.enqueued_at <= after, 'Not %s <= %s <= %s' % (before, job.enqueued_at, after)) - self.assertTrue(before <= job.started_at <= after, 'Not %s <= %s <= %s' % (before, job.started_at, after)) - self.assertTrue(before <= job.ended_at <= after, 'Not %s <= %s <= %s' % (before, job.ended_at, after)) + self.assertTrue( + before <= job.enqueued_at <= after, + 'Not %s <= %s <= %s' % (before, job.enqueued_at, after) + ) + self.assertTrue( + before <= job.started_at <= after, + 'Not %s <= %s <= %s' % (before, job.started_at, after) + ) + self.assertTrue( + before <= job.ended_at <= after, + 'Not %s <= %s <= %s' % (before, job.ended_at, after) + ) def test_work_is_unreadable(self): """Unreadable jobs are put on the failed queue.""" @@ -557,14 +577,16 @@ def kill_worker(pid, double_kill): class TestWorkerShutdown(RQTestCase): def setUp(self): - # we want tests to fail if signal are ignored and the work remain running, - # so set a signal to kill them after 5 seconds + # we want tests to fail if signal are ignored and the work remain + # running, so set a signal to kill them after X seconds + self.killtimeout = 10 signal.signal(signal.SIGALRM, self._timeout) - signal.alarm(5) + signal.alarm(self.killtimeout) def _timeout(self, signal, frame): - raise AssertionError("test still running after 5 seconds, " - "likely the worker wasn't shutdown correctly") + raise AssertionError( + "test still running after %i seconds, likely the worker wasn't shutdown correctly" % self.killtimeout + ) @slow def test_idle_worker_warm_shutdown(self): @@ -621,3 +643,35 @@ class TestWorkerShutdown(RQTestCase): shutdown_requested_date = w.shutdown_requested_date self.assertIsNotNone(shutdown_requested_date) self.assertEqual(type(shutdown_requested_date).__name__, 'datetime') + + +def schedule_access_self(): + q = Queue('default', connection=get_current_connection()) + q.enqueue(access_self) + + +class TestWorkerSubprocess(RQTestCase): + def setUp(self): + super(TestWorkerSubprocess, self).setUp() + db_num = self.testconn.connection_pool.connection_kwargs['db'] + self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num + + def test_run_empty_queue(self): + """Run the worker in its own process with an empty queue""" + subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) + + def test_run_access_self(self): + """Schedule a job, then run the worker as subprocess""" + q = Queue() + q.enqueue(access_self) + subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) + assert get_failed_queue().count == 0 + assert q.count == 0 + + def test_run_scheduled_access_self(self): + """Schedule a job that schedules a job, then run the worker as subprocess""" + q = Queue() + q.enqueue(schedule_access_self) + subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) + assert get_failed_queue().count == 0 + assert q.count == 0 diff --git a/tox.ini b/tox.ini index 28b6aef..93a76b9 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,7 @@ envlist=py26,py27,py33,py34,py35,pypy,flake8 [testenv] -commands=py.test --cov rq {posargs} +commands=py.test --cov rq --durations=5 {posargs} deps= pytest pytest-cov