You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

68 lines
1.8 KiB
Python

import unittest
from redis import Redis
from logbook import NullHandler
from rq import conn
# Test data
def testjob(name=None):
if name is None:
name = 'Stranger'
return 'Hi there, %s!' % (name,)
def failing_job(x):
# Will throw a division-by-zero error
return x / 0
def find_empty_redis_database():
"""Tries to connect to a random Redis database (starting from 4), and
will use/connect it when no keys are in there.
"""
for dbnum in range(4, 17):
testconn = Redis(db=dbnum)
empty = len(testconn.keys('*')) == 0
if empty:
return testconn
assert False, 'No empty Redis database found to run tests in.'
class RQTestCase(unittest.TestCase):
"""Base class to inherit test cases from for RQ.
It sets up the Redis connection (available via self.testconn), turns off
logging to the terminal and flushes the Redis database before and after
running each test.
Also offers assertQueueContains(queue, that_func) assertion method.
"""
@classmethod
def setUpClass(cls):
# Set up connection to Redis
testconn = find_empty_redis_database()
conn.push(testconn)
# Store the connection (for sanity checking)
cls.testconn = testconn
# Shut up logbook
cls.log_handler = NullHandler()
cls.log_handler.push_thread()
def setUp(self):
# Flush beforewards (we like our hygiene)
conn.flushdb()
def tearDown(self):
# Flush afterwards
conn.flushdb()
@classmethod
def tearDownClass(cls):
cls.log_handler.pop_thread()
# Pop the connection to Redis
testconn = conn.pop()
assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.'