Merge pull request #641 from kampfschlaefer/fix_connection_stack_in_stock_worker

Fix connection stack in stock worker
main
Selwin Ong 9 years ago committed by GitHub
commit 0c5fe6251e

@ -11,11 +11,11 @@ python:
- "pypy" - "pypy"
install: install:
- if [[ $TRAVIS_PYTHON_VERSION == 2.6 ]]; then pip install -r py26-requirements.txt; fi - if [[ $TRAVIS_PYTHON_VERSION == 2.6 ]]; then pip install -r py26-requirements.txt; fi
- pip install -r requirements.txt - pip install -e .
- pip install pytest-cov - pip install pytest-cov
- pip install coveralls - pip install coveralls
#- pip install pytest # installed by Travis by default already #- pip install pytest # installed by Travis by default already
script: script:
- RUN_SLOW_TESTS_TOO=1 py.test --cov rq - RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=5
after_success: after_success:
- coveralls - coveralls

@ -16,7 +16,7 @@ from datetime import timedelta
from rq.compat import as_text, string_types, text_type from rq.compat import as_text, string_types, text_type
from .connections import get_current_connection from .connections import get_current_connection, push_connection, pop_connection
from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL
from .exceptions import DequeueTimeout from .exceptions import DequeueTimeout
from .job import Job, JobStatus from .job import Job, JobStatus
@ -581,6 +581,9 @@ class Worker(object):
self.prepare_job_execution(job) self.prepare_job_execution(job)
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
push_connection(self.connection)
started_job_registry = StartedJobRegistry(job.origin, self.connection) started_job_registry = StartedJobRegistry(job.origin, self.connection)
try: try:
@ -623,6 +626,9 @@ class Worker(object):
self.handle_exception(job, *sys.exc_info()) self.handle_exception(job, *sys.exc_info())
return False return False
finally:
pop_connection()
self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id))
if rv is not None: if rv is not None:
log_result = "{0!r}".format(as_text(text_type(rv))) log_result = "{0!r}".format(as_text(text_type(rv)))

@ -9,7 +9,7 @@ from __future__ import (absolute_import, division, print_function,
import os import os
import time import time
from rq import Connection, get_current_job from rq import Connection, get_current_job, get_current_connection
from rq.decorators import job from rq.decorators import job
from rq.compat import PY2 from rq.compat import PY2
@ -55,6 +55,7 @@ def create_file_after_timeout(path, timeout):
def access_self(): def access_self():
assert get_current_connection() is not None
assert get_current_job() is not None assert get_current_job() is not None

@ -302,6 +302,7 @@ class TestJob(RQTestCase):
q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)
assert get_failed_queue(self.testconn).count == 0
def test_job_access_within_synchronous_job_function(self): def test_job_access_within_synchronous_job_function(self):
queue = Queue(async=False) queue = Queue(async=False)

@ -241,15 +241,19 @@ class TestQueue(RQTestCase):
self.assertEqual(queue, fooq) self.assertEqual(queue, fooq)
self.assertEqual(job.func, say_hello) self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, fooq.name) self.assertEqual(job.origin, fooq.name)
self.assertEqual(job.args[0], 'for Foo', self.assertEqual(
'Foo should be dequeued first.') job.args[0], 'for Foo',
'Foo should be dequeued first.'
)
job, queue = Queue.dequeue_any([fooq, barq], None) job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEqual(queue, barq) self.assertEqual(queue, barq)
self.assertEqual(job.func, say_hello) self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, barq.name) self.assertEqual(job.origin, barq.name)
self.assertEqual(job.args[0], 'for Bar', self.assertEqual(
'Bar should be dequeued second.') job.args[0], 'for Bar',
'Bar should be dequeued second.'
)
def test_dequeue_any_ignores_nonexisting_jobs(self): def test_dequeue_any_ignores_nonexisting_jobs(self):
"""Dequeuing (from any queue) silently ignores non-existing jobs.""" """Dequeuing (from any queue) silently ignores non-existing jobs."""
@ -260,8 +264,10 @@ class TestQueue(RQTestCase):
# Dequeue simply ignores the missing job and returns None # Dequeue simply ignores the missing job and returns None
self.assertEqual(q.count, 1) self.assertEqual(q.count, 1)
self.assertEqual(Queue.dequeue_any([Queue(), Queue('low')], None), # noqa self.assertEqual(
None) Queue.dequeue_any([Queue(), Queue('low')], None), # noqa
None
)
self.assertEqual(q.count, 0) self.assertEqual(q.count, 0)
def test_enqueue_sets_status(self): def test_enqueue_sets_status(self):

@ -8,13 +8,16 @@ from time import sleep
import signal import signal
import time import time
from multiprocessing import Process from multiprocessing import Process
import subprocess
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, do_nothing, say_hello, say_pid) div_by_zero, do_nothing, say_hello, say_pid,
access_self)
from tests.helpers import strip_microseconds from tests.helpers import strip_microseconds
from rq import get_failed_queue, Queue, SimpleWorker, Worker from rq import (get_failed_queue, Queue, SimpleWorker, Worker,
get_current_connection)
from rq.compat import as_text, PY2 from rq.compat import as_text, PY2
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
@ -82,12 +85,16 @@ class TestWorker(RQTestCase):
"""Worker processes work, then quits.""" """Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar') fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq]) w = Worker([fooq, barq])
self.assertEqual(w.work(burst=True), False, self.assertEqual(
'Did not expect any work on the queue.') w.work(burst=True), False,
'Did not expect any work on the queue.'
)
fooq.enqueue(say_hello, name='Frank') fooq.enqueue(say_hello, name='Frank')
self.assertEqual(w.work(burst=True), True, self.assertEqual(
'Expected at least some work done.') w.work(burst=True), True,
'Expected at least some work done.'
)
def test_worker_ttl(self): def test_worker_ttl(self):
"""Worker ttl.""" """Worker ttl."""
@ -102,8 +109,10 @@ class TestWorker(RQTestCase):
q = Queue('foo') q = Queue('foo')
w = Worker([q]) w = Worker([q])
job = q.enqueue('tests.fixtures.say_hello', name='Frank') job = q.enqueue('tests.fixtures.say_hello', name='Frank')
self.assertEqual(w.work(burst=True), True, self.assertEqual(
'Expected at least some work done.') w.work(burst=True), True,
'Expected at least some work done.'
)
self.assertEqual(job.result, 'Hi there, Frank!') self.assertEqual(job.result, 'Hi there, Frank!')
def test_job_times(self): def test_job_times(self):
@ -116,14 +125,25 @@ class TestWorker(RQTestCase):
self.assertIsNotNone(job.enqueued_at) self.assertIsNotNone(job.enqueued_at)
self.assertIsNone(job.started_at) self.assertIsNone(job.started_at)
self.assertIsNone(job.ended_at) self.assertIsNone(job.ended_at)
self.assertEqual(w.work(burst=True), True, self.assertEqual(
'Expected at least some work done.') w.work(burst=True), True,
'Expected at least some work done.'
)
self.assertEqual(job.result, 'Hi there, Stranger!') self.assertEqual(job.result, 'Hi there, Stranger!')
after = utcnow() after = utcnow()
job.refresh() job.refresh()
self.assertTrue(before <= job.enqueued_at <= after, 'Not %s <= %s <= %s' % (before, job.enqueued_at, after)) self.assertTrue(
self.assertTrue(before <= job.started_at <= after, 'Not %s <= %s <= %s' % (before, job.started_at, after)) before <= job.enqueued_at <= after,
self.assertTrue(before <= job.ended_at <= after, 'Not %s <= %s <= %s' % (before, job.ended_at, after)) 'Not %s <= %s <= %s' % (before, job.enqueued_at, after)
)
self.assertTrue(
before <= job.started_at <= after,
'Not %s <= %s <= %s' % (before, job.started_at, after)
)
self.assertTrue(
before <= job.ended_at <= after,
'Not %s <= %s <= %s' % (before, job.ended_at, after)
)
def test_work_is_unreadable(self): def test_work_is_unreadable(self):
"""Unreadable jobs are put on the failed queue.""" """Unreadable jobs are put on the failed queue."""
@ -557,14 +577,16 @@ def kill_worker(pid, double_kill):
class TestWorkerShutdown(RQTestCase): class TestWorkerShutdown(RQTestCase):
def setUp(self): def setUp(self):
# we want tests to fail if signal are ignored and the work remain running, # we want tests to fail if signal are ignored and the work remain
# so set a signal to kill them after 5 seconds # running, so set a signal to kill them after X seconds
self.killtimeout = 10
signal.signal(signal.SIGALRM, self._timeout) signal.signal(signal.SIGALRM, self._timeout)
signal.alarm(5) signal.alarm(self.killtimeout)
def _timeout(self, signal, frame): def _timeout(self, signal, frame):
raise AssertionError("test still running after 5 seconds, " raise AssertionError(
"likely the worker wasn't shutdown correctly") "test still running after %i seconds, likely the worker wasn't shutdown correctly" % self.killtimeout
)
@slow @slow
def test_idle_worker_warm_shutdown(self): def test_idle_worker_warm_shutdown(self):
@ -621,3 +643,35 @@ class TestWorkerShutdown(RQTestCase):
shutdown_requested_date = w.shutdown_requested_date shutdown_requested_date = w.shutdown_requested_date
self.assertIsNotNone(shutdown_requested_date) self.assertIsNotNone(shutdown_requested_date)
self.assertEqual(type(shutdown_requested_date).__name__, 'datetime') self.assertEqual(type(shutdown_requested_date).__name__, 'datetime')
def schedule_access_self():
q = Queue('default', connection=get_current_connection())
q.enqueue(access_self)
class TestWorkerSubprocess(RQTestCase):
def setUp(self):
super(TestWorkerSubprocess, self).setUp()
db_num = self.testconn.connection_pool.connection_kwargs['db']
self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num
def test_run_empty_queue(self):
"""Run the worker in its own process with an empty queue"""
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
def test_run_access_self(self):
"""Schedule a job, then run the worker as subprocess"""
q = Queue()
q.enqueue(access_self)
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
assert get_failed_queue().count == 0
assert q.count == 0
def test_run_scheduled_access_self(self):
"""Schedule a job that schedules a job, then run the worker as subprocess"""
q = Queue()
q.enqueue(schedule_access_self)
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
assert get_failed_queue().count == 0
assert q.count == 0

@ -2,7 +2,7 @@
envlist=py26,py27,py33,py34,py35,pypy,flake8 envlist=py26,py27,py33,py34,py35,pypy,flake8
[testenv] [testenv]
commands=py.test --cov rq {posargs} commands=py.test --cov rq --durations=5 {posargs}
deps= deps=
pytest pytest
pytest-cov pytest-cov

Loading…
Cancel
Save