Make test cases more explicit

main
Vincent Driessen 10 years ago
parent 82df2ee689
commit df4d4c8d5d

@ -54,8 +54,7 @@ def create_file_after_timeout(path, timeout):
def access_self():
job = get_current_job()
return job.id
assert get_current_job() is not None
def echo(*args, **kwargs):

@ -4,17 +4,18 @@ from __future__ import (absolute_import, division, print_function,
from datetime import datetime
from rq.compat import as_text, PY2
from tests import RQTestCase
from tests.fixtures import (CallableObject, Number, access_self,
long_running_job, say_hello, some_calculation)
from tests.helpers import strip_microseconds
from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import get_current_job, Job
from rq.job import Job, get_current_job
from rq.queue import Queue
from rq.registry import DeferredJobRegistry
from rq.utils import utcformat
from tests import RQTestCase
from tests.fixtures import (access_self, CallableObject, Number, say_hello,
some_calculation, long_running_job)
from tests.helpers import strip_microseconds
from rq.worker import Worker
try:
from cPickle import loads, dumps
@ -291,25 +292,20 @@ class TestJob(RQTestCase):
else:
self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
def test_job_access_within_job_function(self):
"""The current job is accessible within the job function."""
# Executing the job function from outside of RQ throws an exception
def test_job_access_outside_job_fails(self):
"""The current job is accessible only within a job context."""
self.assertIsNone(get_current_job())
# Executing the job function from within the job works (and in
# this case leads to the job ID being returned)
job = Job.create(func=access_self)
job.save()
id = job.perform()
self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self)
def test_job_access_within_job_function(self):
"""The current job is accessible within the job function."""
q = Queue()
q.enqueue(access_self) # access_self calls get_current_job() and asserts
w = Worker([q])
w.work(burst=True)
# Ensure that get_current_job also works from within synchronous jobs
def test_job_access_within_synchronous_job_function(self):
queue = Queue(async=False)
job = queue.enqueue(access_self)
id = job.perform()
self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self)
queue.enqueue(access_self)
def test_get_result_ttl(self):
"""Getting job result TTL."""

Loading…
Cancel
Save