Merge pull request #876 from theodesp/Issue-866

Issue 866
main
Selwin Ong 7 years ago committed by GitHub
commit fa6c28bf3c

@ -67,18 +67,18 @@ else:
# Python 2.x # Python 2.x
def text_type(v): def text_type(v):
try: try:
return unicode(v) return unicode(v) # noqa
except Exception: except Exception:
return unicode(v, "utf-8", errors="ignore") return unicode(v, "utf-8", errors="ignore") # noqa
string_types = (str, unicode) string_types = (str, unicode) # noqa
def as_text(v): def as_text(v):
if v is None: if v is None:
return None return None
elif isinstance(v, str): elif isinstance(v, str):
return v.decode('utf-8') return v.decode('utf-8')
elif isinstance(v, unicode): elif isinstance(v, unicode): # noqa
return v return v
else: else:
raise Exception("Input cannot be decoded into literal thing.") raise Exception("Input cannot be decoded into literal thing.")

@ -15,7 +15,7 @@ class NoRedisConnectionException(Exception):
@contextmanager @contextmanager
def Connection(connection=None): def Connection(connection=None): # noqa
if connection is None: if connection is None:
connection = StrictRedis() connection = StrictRedis()
push_connection(connection) push_connection(connection)

@ -11,7 +11,7 @@ from .queue import Queue
from .utils import backend_class from .utils import backend_class
class job(object): class job(object): # noqa
queue_class = Queue queue_class = Queue
def __init__(self, queue, connection=None, timeout=None, def __init__(self, queue, connection=None, timeout=None,

@ -121,7 +121,8 @@ class Queue(object):
except NoSuchJobError: except NoSuchJobError:
self.remove(job_id) self.remove(job_id)
else: else:
if job.origin == self.name or (job.is_failed and self == get_failed_queue(connection=self.connection, job_class=self.job_class)): if job.origin == self.name or \
(job.is_failed and self == get_failed_queue(connection=self.connection, job_class=self.job_class)):
return job return job
def get_job_ids(self, offset=0, length=-1): def get_job_ids(self, offset=0, length=-1):
@ -168,7 +169,7 @@ class Queue(object):
"""Removes all "dead" jobs from the queue by cycling through it, while """Removes all "dead" jobs from the queue by cycling through it, while
guaranteeing FIFO semantics. guaranteeing FIFO semantics.
""" """
COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4()) COMPACT_QUEUE = 'rq:queue:_compact:{0}'.format(uuid.uuid4()) # noqa
self.connection.rename(self.key, COMPACT_QUEUE) self.connection.rename(self.key, COMPACT_QUEUE)
while True: while True:

@ -469,13 +469,13 @@ class Worker(object):
if burst: if burst:
self.log.info("RQ worker {0!r} done, quitting".format(self.key)) self.log.info("RQ worker {0!r} done, quitting".format(self.key))
break break
job, queue = result job, queue = result
self.execute_job(job, queue) self.execute_job(job, queue)
self.heartbeat() self.heartbeat()
did_perform_work = True did_perform_work = True
except StopRequested: except StopRequested:
break break
finally: finally:

@ -70,7 +70,7 @@ class RQTestCase(unittest.TestCase):
# Implement assertIsNotNone for Python runtimes < 2.7 or < 3.1 # Implement assertIsNotNone for Python runtimes < 2.7 or < 3.1
if not hasattr(unittest.TestCase, 'assertIsNotNone'): if not hasattr(unittest.TestCase, 'assertIsNotNone'):
def assertIsNotNone(self, value, *args): def assertIsNotNone(self, value, *args): # noqa
self.assertNotEqual(value, None, *args) self.assertNotEqual(value, None, *args)
@classmethod @classmethod

@ -29,7 +29,7 @@ def say_hello(name=None):
def say_hello_unicode(name=None): def say_hello_unicode(name=None):
"""A job with a single argument and a return value.""" """A job with a single argument and a return value."""
return unicode(say_hello(name)) return unicode(say_hello(name)) # noqa
def do_nothing(): def do_nothing():
@ -80,7 +80,7 @@ def modify_self_and_error(meta):
def echo(*args, **kwargs): def echo(*args, **kwargs):
return (args, kwargs) return args, kwargs
class Number(object): class Number(object):

@ -6,6 +6,7 @@ from datetime import datetime
import time import time
import sys import sys
is_py2 = sys.version[0] == '2' is_py2 = sys.version[0] == '2'
if is_py2: if is_py2:
import Queue as queue import Queue as queue
@ -19,7 +20,6 @@ from rq.compat import PY2
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job
from rq.queue import Queue, get_failed_queue from rq.queue import Queue, get_failed_queue
from rq.registry import DeferredJobRegistry
from rq.utils import utcformat from rq.utils import utcformat
from rq.worker import Worker from rq.worker import Worker

@ -655,11 +655,11 @@ class TestFailedQueue(RQTestCase):
"""Make sure failed job key does not expire""" """Make sure failed job key does not expire"""
q = Queue('foo') q = Queue('foo')
job = q.enqueue(div_by_zero, args=(1,), ttl=5) job = q.enqueue(div_by_zero, args=(1,), ttl=5)
self.assertEqual(self.testconn.ttl(job.key), 5) self.assertEqual(self.testconn.ttl(job.key), 5)
self.assertRaises(ZeroDivisionError, job.perform) self.assertRaises(ZeroDivisionError, job.perform)
job.set_status(JobStatus.FAILED) job.set_status(JobStatus.FAILED)
failed_queue = get_failed_queue() failed_queue = get_failed_queue()
failed_queue.quarantine(job, Exception('Some fake error')) failed_queue.quarantine(job, Exception('Some fake error'))
self.assertEqual(self.testconn.ttl(job.key), -1) self.assertEqual(self.testconn.ttl(job.key), -1)

@ -11,7 +11,7 @@ from tests import RQTestCase
class FakeSentry(object): class FakeSentry(object):
servers = [] servers = []
def captureException(self, *args, **kwds): def captureException(self, *args, **kwds): # noqa
pass # we cannot check this, because worker forks pass # we cannot check this, because worker forks

@ -259,10 +259,10 @@ class TestWorker(RQTestCase):
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEqual(job.is_failed, True) self.assertEqual(job.is_failed, True)
def test_cancelled_jobs_arent_executed(self): # noqa def test_cancelled_jobs_arent_executed(self):
"""Cancelling jobs.""" """Cancelling jobs."""
SENTINEL_FILE = '/tmp/rq-tests.txt' SENTINEL_FILE = '/tmp/rq-tests.txt' # noqa
try: try:
# Remove the sentinel if it is leftover from a previous test run # Remove the sentinel if it is leftover from a previous test run
@ -471,19 +471,19 @@ class TestWorker(RQTestCase):
logging work properly""" logging work properly"""
q = Queue("foo") q = Queue("foo")
w = Worker([q]) w = Worker([q])
job = q.enqueue('tests.fixtures.say_hello', name='阿达姆', q.enqueue('tests.fixtures.say_hello', name='阿达姆',
description='你好 世界!') description='你好 世界!')
self.assertEqual(w.work(burst=True), True, self.assertEqual(w.work(burst=True), True,
'Expected at least some work done.') 'Expected at least some work done.')
job = q.enqueue('tests.fixtures.say_hello_unicode', name='阿达姆', q.enqueue('tests.fixtures.say_hello_unicode', name='阿达姆',
description='你好 世界!') description='你好 世界!')
self.assertEqual(w.work(burst=True), True, self.assertEqual(w.work(burst=True), True,
'Expected at least some work done.') 'Expected at least some work done.')
def test_suspend_worker_execution(self): def test_suspend_worker_execution(self):
"""Test Pause Worker Execution""" """Test Pause Worker Execution"""
SENTINEL_FILE = '/tmp/rq-tests.txt' SENTINEL_FILE = '/tmp/rq-tests.txt' # noqa
try: try:
# Remove the sentinel if it is leftover from a previous test run # Remove the sentinel if it is leftover from a previous test run
@ -919,7 +919,7 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w = HerokuWorker('foo') w = HerokuWorker('foo')
w._horse_pid = 19999 w._horse_pid = 19999
w.handle_warm_shutdown_request() w.handle_warm_shutdown_request()
class TestExceptionHandlerMessageEncoding(RQTestCase): class TestExceptionHandlerMessageEncoding(RQTestCase):

Loading…
Cancel
Save