diff --git a/tests/fixtures.py b/tests/fixtures.py index 4271373..a2bbefb 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -54,8 +54,7 @@ def create_file_after_timeout(path, timeout): def access_self(): - job = get_current_job() - return job.id + assert get_current_job() is not None def echo(*args, **kwargs): diff --git a/tests/test_job.py b/tests/test_job.py index 0f6ab30..0ce142e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -4,17 +4,18 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime -from rq.compat import as_text, PY2 +from tests import RQTestCase +from tests.fixtures import (CallableObject, Number, access_self, + long_running_job, say_hello, some_calculation) +from tests.helpers import strip_microseconds + +from rq.compat import PY2, as_text from rq.exceptions import NoSuchJobError, UnpickleError -from rq.job import get_current_job, Job +from rq.job import Job, get_current_job from rq.queue import Queue from rq.registry import DeferredJobRegistry from rq.utils import utcformat - -from tests import RQTestCase -from tests.fixtures import (access_self, CallableObject, Number, say_hello, - some_calculation, long_running_job) -from tests.helpers import strip_microseconds +from rq.worker import Worker try: from cPickle import loads, dumps @@ -291,25 +292,20 @@ class TestJob(RQTestCase): else: self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") - def test_job_access_within_job_function(self): - """The current job is accessible within the job function.""" - # Executing the job function from outside of RQ throws an exception + def test_job_access_outside_job_fails(self): + """The current job is accessible only within a job context.""" self.assertIsNone(get_current_job()) - # Executing the job function from within the job works (and in - # this case leads to the job ID being returned) - job = Job.create(func=access_self) - job.save() - id = job.perform() - self.assertEqual(job.id, id) - self.assertEqual(job.func, access_self) + def test_job_access_within_job_function(self): + """The current job is accessible within the job function.""" + q = Queue() + q.enqueue(access_self) # access_self calls get_current_job() and asserts + w = Worker([q]) + w.work(burst=True) - # Ensure that get_current_job also works from within synchronous jobs + def test_job_access_within_synchronous_job_function(self): queue = Queue(async=False) - job = queue.enqueue(access_self) - id = job.perform() - self.assertEqual(job.id, id) - self.assertEqual(job.func, access_self) + queue.enqueue(access_self) def test_get_result_ttl(self): """Getting job result TTL."""