Move job from Worker to Job test suite.

main
Vincent Driessen 13 years ago
parent 15342f14d3
commit 2b6101d110

@ -1,7 +1,7 @@
import times
from datetime import datetime
from tests import RQTestCase
from tests.fixtures import some_calculation
from tests.fixtures import some_calculation, say_hello
from tests.helpers import strip_milliseconds
from cPickle import loads
from rq import Job
@ -137,3 +137,19 @@ class TestJob(RQTestCase):
self.testconn.hset(job.key, 'data', 'this is no pickle string')
with self.assertRaises(UnpickleError):
job.refresh()
def test_job_is_unimportable(self):
"""Jobs that cannot be imported throw exception on access."""
job = Job.create(say_hello, 'Lionel')
job.save()
# Now slightly modify the job to make it unimportable (this is
# equivalent to a worker not having the most up-to-date source code
# and unable to import the function)
data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace('say_hello', 'shut_up')
self.testconn.hset(job.key, 'data', unimportable_data)
job.refresh()
with self.assertRaises(AttributeError):
job.func # accessing the func property should fail

@ -54,24 +54,6 @@ class TestWorker(RQTestCase):
self.assertEquals(q.count, 0)
self.assertEquals(failed_q.count, 1)
def test_work_is_unimportable(self):
"""Jobs that cannot be imported are put on the failed queue."""
q = Queue()
job = q.enqueue(say_hello, 'Lionel')
job.save()
# Now slightly modify the job to make it unimportable (this is
# equivalent to a worker not having the most up-to-date source code
# and unable to import the function)
data = self.testconn.hget(job.key, 'data')
unimportable_data = data.replace('say_hello', 'shut_up')
self.testconn.hset(job.key, 'data', unimportable_data)
job.refresh()
with self.assertRaises((ImportError, AttributeError)):
job.func # accessing the func property should fail
def test_work_fails(self):
"""Failing jobs are put on the failed queue."""
q = Queue()

Loading…
Cancel
Save