Organize test fixtures into a separate file.

main
Vincent Driessen 13 years ago
parent 5717a0ba15
commit f07d28db86

@ -3,17 +3,6 @@ from redis import Redis
from logbook import NullHandler from logbook import NullHandler
from rq import conn from rq import conn
# Test data
def testjob(name=None):
if name is None:
name = 'Stranger'
return 'Hi there, %s!' % (name,)
def failing_job(x):
# Will throw a division-by-zero error
return x / 0
def find_empty_redis_database(): def find_empty_redis_database():
"""Tries to connect to a random Redis database (starting from 4), and """Tries to connect to a random Redis database (starting from 4), and
will use/connect it when no keys are in there. will use/connect it when no keys are in there.

@ -0,0 +1,32 @@
"""
This file contains all jobs that are used in tests. Each of these test
fixtures has a slighty different characteristics.
"""
def say_hello(name=None):
"""A job with a single argument and a return value."""
if name is None:
name = 'Stranger'
return 'Hi there, %s!' % (name,)
def do_nothing():
"""The best job in the world."""
pass
def div_by_zero(x):
"""Prepare for a division-by-zero exception."""
return x / 0
def some_calculation(x, y, z=1):
"""Some arbitrary calculation with three numbers. Choose z smartly if you
want a division by zero exception.
"""
return x * y / z
def create_file(path):
"""Creates a file at the given path. Actually, leaves evidence that the
job ran."""
with open(path, 'w') as f:
f.write('Just a sentinel.')

@ -1,16 +1,13 @@
import times import times
from datetime import datetime from datetime import datetime
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import some_calculation
from tests.helpers import strip_milliseconds from tests.helpers import strip_milliseconds
from cPickle import loads from cPickle import loads
from rq import Job from rq import Job
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
def arbitrary_function(x, y, z=1):
return x * y / z
class TestJob(RQTestCase): class TestJob(RQTestCase):
def test_create_empty_job(self): def test_create_empty_job(self):
"""Creation of new empty jobs.""" """Creation of new empty jobs."""
@ -32,7 +29,7 @@ class TestJob(RQTestCase):
def test_create_typical_job(self): def test_create_typical_job(self):
"""Creation of jobs for function calls.""" """Creation of jobs for function calls."""
job = Job.create(arbitrary_function, 3, 4, z=2) job = Job.create(some_calculation, 3, 4, z=2)
# Jobs have a random UUID # Jobs have a random UUID
self.assertIsNotNone(job.id) self.assertIsNotNone(job.id)
@ -40,7 +37,7 @@ class TestJob(RQTestCase):
self.assertIsNotNone(job.description) self.assertIsNotNone(job.description)
# Job data is set... # Job data is set...
self.assertEquals(job.func, arbitrary_function) self.assertEquals(job.func, some_calculation)
self.assertEquals(job.args, (3, 4)) self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, {'z': 2}) self.assertEquals(job.kwargs, {'z': 2})
@ -52,7 +49,7 @@ class TestJob(RQTestCase):
def test_save(self): # noqa def test_save(self): # noqa
"""Storing jobs.""" """Storing jobs."""
job = Job.create(arbitrary_function, 3, 4, z=2) job = Job.create(some_calculation, 3, 4, z=2)
# Saving creates a Redis hash # Saving creates a Redis hash
self.assertEquals(self.testconn.exists(job.key), False) self.assertEquals(self.testconn.exists(job.key), False)
@ -61,20 +58,20 @@ class TestJob(RQTestCase):
# Saving writes pickled job data # Saving writes pickled job data
unpickled_data = loads(self.testconn.hget(job.key, 'data')) unpickled_data = loads(self.testconn.hget(job.key, 'data'))
self.assertEquals(unpickled_data[0], arbitrary_function) self.assertEquals(unpickled_data[0], some_calculation)
def test_fetch(self): def test_fetch(self):
"""Fetching jobs.""" """Fetching jobs."""
# Prepare test # Prepare test
self.testconn.hset('rq:job:some_id', 'data', self.testconn.hset('rq:job:some_id', 'data',
"(ctest_job\narbitrary_function\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa "(ctest_job\nsome_calculation\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") # noqa
self.testconn.hset('rq:job:some_id', 'created_at', self.testconn.hset('rq:job:some_id', 'created_at',
"2012-02-07 22:13:24+0000") "2012-02-07 22:13:24+0000")
# Fetch returns a job # Fetch returns a job
job = Job.fetch('some_id') job = Job.fetch('some_id')
self.assertEquals(job.id, 'some_id') self.assertEquals(job.id, 'some_id')
self.assertEquals(job.func, arbitrary_function) self.assertEquals(job.func, some_calculation)
self.assertEquals(job.args, (3, 4)) self.assertEquals(job.args, (3, 4))
self.assertEquals(job.kwargs, dict(z=2)) self.assertEquals(job.kwargs, dict(z=2))
self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24))
@ -98,7 +95,7 @@ class TestJob(RQTestCase):
def test_persistence_of_typical_jobs(self): def test_persistence_of_typical_jobs(self):
"""Storing typical jobs.""" """Storing typical jobs."""
job = Job.create(arbitrary_function, 3, 4, z=2) job = Job.create(some_calculation, 3, 4, z=2)
job.save() job.save()
expected_date = strip_milliseconds(job.created_at) expected_date = strip_milliseconds(job.created_at)
@ -113,7 +110,7 @@ class TestJob(RQTestCase):
['created_at', 'data', 'description']) ['created_at', 'data', 'description'])
def test_store_then_fetch(self): def test_store_then_fetch(self):
job = Job.create(arbitrary_function, 3, 4, z=2) job = Job.create(some_calculation, 3, 4, z=2)
job.save() job.save()
job2 = Job.fetch(job.id) job2 = Job.fetch(job.id)
@ -132,7 +129,7 @@ class TestJob(RQTestCase):
def test_fetching_unreadable_data(self): def test_fetching_unreadable_data(self):
"""Fetching fails on unreadable data.""" """Fetching fails on unreadable data."""
# Set up # Set up
job = Job.create(arbitrary_function, 3, 4, z=2) job = Job.create(some_calculation, 3, 4, z=2)
job.save() job.save()
# Just replace the data hkey with some random noise # Just replace the data hkey with some random noise
@ -141,14 +138,14 @@ class TestJob(RQTestCase):
job.refresh() job.refresh()
# Set up (part B) # Set up (part B)
job = Job.create(arbitrary_function, 3, 4, z=2) job = Job.create(some_calculation, 3, 4, z=2)
job.save() job.save()
# Now slightly modify the job to make it unpickl'able (this is # Now slightly modify the job to make it unpickl'able (this is
# equivalent to a worker not having the most up-to-date source code and # equivalent to a worker not having the most up-to-date source code and
# unable to import the function) # unable to import the function)
data = self.testconn.hget(job.key, 'data') data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace('arbitrary_function', 'broken') unimportable_data = data.replace('some_calculation', 'broken')
self.testconn.hset(job.key, 'data', unimportable_data) self.testconn.hset(job.key, 'data', unimportable_data)
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
job.refresh() job.refresh()

@ -1,5 +1,5 @@
from tests import RQTestCase from tests import RQTestCase
from tests import testjob, failing_job from tests.fixtures import say_hello, div_by_zero
from rq import Queue, FailedQueue, Job from rq import Queue, FailedQueue, Job
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
@ -53,10 +53,10 @@ class TestQueue(RQTestCase):
"""Compacting queueus.""" """Compacting queueus."""
q = Queue() q = Queue()
q.enqueue(testjob, 'Alice') q.enqueue(say_hello, 'Alice')
bob = q.enqueue(testjob, 'Bob') bob = q.enqueue(say_hello, 'Bob')
q.enqueue(testjob, 'Charlie') q.enqueue(say_hello, 'Charlie')
debrah = q.enqueue(testjob, 'Debrah') debrah = q.enqueue(say_hello, 'Debrah')
bob.cancel() bob.cancel()
debrah.cancel() debrah.cancel()
@ -73,8 +73,8 @@ class TestQueue(RQTestCase):
q = Queue() q = Queue()
self.assertEquals(q.is_empty(), True) self.assertEquals(q.is_empty(), True)
# testjob spec holds which queue this is sent to # say_hello spec holds which queue this is sent to
job = q.enqueue(testjob, 'Nick', foo='bar') job = q.enqueue(say_hello, 'Nick', foo='bar')
job_id = job.id job_id = job.id
# Inspect data inside Redis # Inspect data inside Redis
@ -85,7 +85,7 @@ class TestQueue(RQTestCase):
def test_enqueue_sets_metadata(self): def test_enqueue_sets_metadata(self):
"""Enqueueing job onto queues modifies meta data.""" """Enqueueing job onto queues modifies meta data."""
q = Queue() q = Queue()
job = Job.create(testjob, 'Nick', foo='bar') job = Job.create(say_hello, 'Nick', foo='bar')
# Preconditions # Preconditions
self.assertIsNone(job.origin) self.assertIsNone(job.origin)
@ -117,13 +117,13 @@ class TestQueue(RQTestCase):
"""Dequeueing jobs from queues.""" """Dequeueing jobs from queues."""
# Set up # Set up
q = Queue() q = Queue()
result = q.enqueue(testjob, 'Rick', foo='bar') result = q.enqueue(say_hello, 'Rick', foo='bar')
# Dequeue a job (not a job ID) off the queue # Dequeue a job (not a job ID) off the queue
self.assertEquals(q.count, 1) self.assertEquals(q.count, 1)
job = q.dequeue() job = q.dequeue()
self.assertEquals(job.id, result.id) self.assertEquals(job.id, result.id)
self.assertEquals(job.func, testjob) self.assertEquals(job.func, say_hello)
self.assertEquals(job.origin, q.name) self.assertEquals(job.origin, q.name)
self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.args[0], 'Rick')
self.assertEquals(job.kwargs['foo'], 'bar') self.assertEquals(job.kwargs['foo'], 'bar')
@ -138,7 +138,7 @@ class TestQueue(RQTestCase):
uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
q.push_job_id(uuid) q.push_job_id(uuid)
q.push_job_id(uuid) q.push_job_id(uuid)
result = q.enqueue(testjob, 'Nick', foo='bar') result = q.enqueue(say_hello, 'Nick', foo='bar')
q.push_job_id(uuid) q.push_job_id(uuid)
# Dequeue simply ignores the missing job and returns None # Dequeue simply ignores the missing job and returns None
@ -155,25 +155,25 @@ class TestQueue(RQTestCase):
self.assertEquals(Queue.dequeue_any([fooq, barq], False), None) self.assertEquals(Queue.dequeue_any([fooq, barq], False), None)
# Enqueue a single item # Enqueue a single item
barq.enqueue(testjob) barq.enqueue(say_hello)
job, queue = Queue.dequeue_any([fooq, barq], False) job, queue = Queue.dequeue_any([fooq, barq], False)
self.assertEquals(job.func, testjob) self.assertEquals(job.func, say_hello)
self.assertEquals(queue, barq) self.assertEquals(queue, barq)
# Enqueue items on both queues # Enqueue items on both queues
barq.enqueue(testjob, 'for Bar') barq.enqueue(say_hello, 'for Bar')
fooq.enqueue(testjob, 'for Foo') fooq.enqueue(say_hello, 'for Foo')
job, queue = Queue.dequeue_any([fooq, barq], False) job, queue = Queue.dequeue_any([fooq, barq], False)
self.assertEquals(queue, fooq) self.assertEquals(queue, fooq)
self.assertEquals(job.func, testjob) self.assertEquals(job.func, say_hello)
self.assertEquals(job.origin, fooq.name) self.assertEquals(job.origin, fooq.name)
self.assertEquals(job.args[0], 'for Foo', self.assertEquals(job.args[0], 'for Foo',
'Foo should be dequeued first.') 'Foo should be dequeued first.')
job, queue = Queue.dequeue_any([fooq, barq], False) job, queue = Queue.dequeue_any([fooq, barq], False)
self.assertEquals(queue, barq) self.assertEquals(queue, barq)
self.assertEquals(job.func, testjob) self.assertEquals(job.func, say_hello)
self.assertEquals(job.origin, barq.name) self.assertEquals(job.origin, barq.name)
self.assertEquals(job.args[0], 'for Bar', self.assertEquals(job.args[0], 'for Bar',
'Bar should be dequeued second.') 'Bar should be dequeued second.')
@ -195,7 +195,7 @@ class TestQueue(RQTestCase):
class TestFailedQueue(RQTestCase): class TestFailedQueue(RQTestCase):
def test_requeue_job(self): def test_requeue_job(self):
"""Requeueing existing jobs.""" """Requeueing existing jobs."""
job = Job.create(failing_job, 1, 2, 3) job = Job.create(div_by_zero, 1, 2, 3)
job.origin = 'fake' job.origin = 'fake'
job.save() job.save()
FailedQueue().quarantine(job, Exception('Some fake error')) FailedQueue().quarantine(job, Exception('Some fake error'))
@ -211,7 +211,7 @@ class TestFailedQueue(RQTestCase):
def test_requeue_nonfailed_job_fails(self): def test_requeue_nonfailed_job_fails(self):
"""Requeueing non-failed jobs raises error.""" """Requeueing non-failed jobs raises error."""
q = Queue() q = Queue()
job = q.enqueue(testjob, 'Nick', foo='bar') job = q.enqueue(say_hello, 'Nick', foo='bar')
# Assert that we cannot requeue a job that's not on the failed queue # Assert that we cannot requeue a job that's not on the failed queue
with self.assertRaises(InvalidJobOperationError): with self.assertRaises(InvalidJobOperationError):

@ -1,19 +1,10 @@
import os import os
from tests import RQTestCase from tests import RQTestCase
from tests import testjob, failing_job from tests.fixtures import say_hello, div_by_zero, create_file
from tests.helpers import strip_milliseconds from tests.helpers import strip_milliseconds
from rq import Queue, Worker, Job from rq import Queue, Worker, Job
SENTINEL_FILE = '/tmp/rq-tests.txt'
def create_sentinel():
# Create some evidence that the job ran
with open(SENTINEL_FILE, 'w') as f:
f.write('Just a sentinel.')
class TestWorker(RQTestCase): class TestWorker(RQTestCase):
def test_create_worker(self): def test_create_worker(self):
"""Worker creation.""" """Worker creation."""
@ -27,7 +18,7 @@ class TestWorker(RQTestCase):
w = Worker([fooq, barq]) w = Worker([fooq, barq])
self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.') self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.')
fooq.enqueue(testjob, name='Frank') fooq.enqueue(say_hello, name='Frank')
self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.')
def test_work_is_unreadable(self): def test_work_is_unreadable(self):
@ -41,7 +32,7 @@ class TestWorker(RQTestCase):
# NOTE: We have to fake this enqueueing for this test case. # NOTE: We have to fake this enqueueing for this test case.
# What we're simulating here is a call to a function that is not # What we're simulating here is a call to a function that is not
# importable from the worker process. # importable from the worker process.
job = Job.create(failing_job, 3) job = Job.create(div_by_zero, 3)
job.save() job.save()
data = self.testconn.hget(job.key, 'data') data = self.testconn.hget(job.key, 'data')
invalid_data = data.replace('failing_job', 'nonexisting_job') invalid_data = data.replace('failing_job', 'nonexisting_job')
@ -70,7 +61,7 @@ class TestWorker(RQTestCase):
self.assertEquals(q.count, 0) self.assertEquals(q.count, 0)
# Action # Action
job = q.enqueue(failing_job) job = q.enqueue(div_by_zero)
self.assertEquals(q.count, 1) self.assertEquals(q.count, 1)
# keep for later # keep for later
@ -95,6 +86,9 @@ class TestWorker(RQTestCase):
def test_cancelled_jobs_arent_executed(self): def test_cancelled_jobs_arent_executed(self):
"""Cancelling jobs.""" """Cancelling jobs."""
SENTINEL_FILE = '/tmp/rq-tests.txt'
try: try:
# Remove the sentinel if it is leftover from a previous test run # Remove the sentinel if it is leftover from a previous test run
os.remove(SENTINEL_FILE) os.remove(SENTINEL_FILE)
@ -103,7 +97,7 @@ class TestWorker(RQTestCase):
raise raise
q = Queue() q = Queue()
result = q.enqueue(create_sentinel) result = q.enqueue(create_file, SENTINEL_FILE)
# Here, we cancel the job, so the sentinel file may not be created # Here, we cancel the job, so the sentinel file may not be created
assert q.count == 1 assert q.count == 1

Loading…
Cancel
Save