Broke down tests into multiple files.

main
Vincent Driessen 13 years ago
parent 210477c2ab
commit 1f64157c38

@ -0,0 +1,62 @@
import unittest
from pickle import loads
from redis import Redis
from logbook import NullHandler
from rq import conn
# Test data
def testjob(name=None):
if name is None:
name = 'Stranger'
return 'Hi there, %s!' % (name,)
class RQTestCase(unittest.TestCase):
"""Base class to inherit test cases from for RQ.
It sets up the Redis connection (available via self.testconn), turns off
logging to the terminal and flushes the Redis database before and after
running each test.
Also offers assertQueueContains(queue, that_func) assertion method.
"""
@classmethod
def setUpClass(cls):
# Set up connection to Redis
testconn = Redis()
conn.push(testconn)
# Store the connection (for sanity checking)
cls.testconn = testconn
# Shut up logbook
cls.log_handler = NullHandler()
cls.log_handler.push_thread()
def setUp(self):
# Flush beforewards (we like our hygiene)
conn.flushdb()
def tearDown(self):
# Flush afterwards
conn.flushdb()
@classmethod
def tearDownClass(cls):
cls.log_handler.pop_thread()
# Pop the connection to Redis
testconn = conn.pop()
assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.'
def assertQueueContains(self, queue, that_func):
# Do a queue scan (this is O(n), but we're in a test, so hey)
for message in queue.messages:
f, _, args, kwargs = loads(message)
if f == that_func:
return
self.fail('Queue %s does not contain message for function %s' %
(queue.key, that_func))

@ -0,0 +1,11 @@
from tests import RQTestCase
#from pickle import loads, dumps
#from rq import Queue, Worker
#from rq.exceptions import DequeueError
class TestJob(RQTestCase):
def test_create_job(self):
"""Creation of jobs."""
pass

@ -1,57 +1,9 @@
import unittest from tests import RQTestCase
from pickle import loads, dumps from tests import testjob
from redis import Redis from pickle import dumps
from logbook import NullHandler from rq import Queue
from rq import conn, Queue, Worker
from rq.exceptions import DequeueError from rq.exceptions import DequeueError
# Test data
def testjob(name=None):
if name is None:
name = 'Stranger'
return 'Hi there, %s!' % (name,)
class RQTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Set up connection to Redis
testconn = Redis()
conn.push(testconn)
# Store the connection (for sanity checking)
cls.testconn = testconn
# Shut up logbook
cls.log_handler = NullHandler()
cls.log_handler.push_thread()
def setUp(self):
# Flush beforewards (we like our hygiene)
conn.flushdb()
def tearDown(self):
# Flush afterwards
conn.flushdb()
@classmethod
def tearDownClass(cls):
cls.log_handler.pop_thread()
# Pop the connection to Redis
testconn = conn.pop()
assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.'
def assertQueueContains(self, queue, that_func):
# Do a queue scan (this is O(n), but we're in a test, so hey)
for message in queue.messages:
f, _, args, kwargs = loads(message)
if f == that_func:
return
self.fail('Queue %s does not contain message for function %s' %
(queue.key, that_func))
class TestQueue(RQTestCase): class TestQueue(RQTestCase):
def test_create_queue(self): def test_create_queue(self):
@ -82,7 +34,7 @@ class TestQueue(RQTestCase):
q = Queue('my-queue') q = Queue('my-queue')
self.assertEquals(q.empty, True) self.assertEquals(q.empty, True)
conn.rpush('rq:queue:my-queue', 'some val') self.testconn.rpush('rq:queue:my-queue', 'some val')
self.assertEquals(q.empty, False) self.assertEquals(q.empty, False)
@ -166,23 +118,3 @@ class TestQueue(RQTestCase):
with self.assertRaises(DequeueError): with self.assertRaises(DequeueError):
q.dequeue() # error occurs when dequeue()'ing q.dequeue() # error occurs when dequeue()'ing
class TestWorker(RQTestCase):
def test_create_worker(self):
"""Worker creation."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq])
self.assertEquals(w.queues, [fooq, barq])
def test_work_and_quit(self):
"""Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq])
self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.')
fooq.enqueue(testjob, name='Frank')
self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.')
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,22 @@
from tests import RQTestCase
from tests import testjob
from rq import Queue, Worker
class TestWorker(RQTestCase):
def test_create_worker(self):
"""Worker creation."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq])
self.assertEquals(w.queues, [fooq, barq])
def test_work_and_quit(self):
"""Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq])
self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.')
fooq.enqueue(testjob, name='Frank')
self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.')
Loading…
Cancel
Save