|
|
|
@ -3,10 +3,11 @@ from __future__ import (absolute_import, division, print_function,
|
|
|
|
|
unicode_literals)
|
|
|
|
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
import warnings
|
|
|
|
|
|
|
|
|
|
from rq.compat import as_text, PY2
|
|
|
|
|
from rq.exceptions import NoSuchJobError, UnpickleError
|
|
|
|
|
from rq.job import get_current_job, Job
|
|
|
|
|
from rq.job import get_current_job, Job, Status
|
|
|
|
|
from rq.queue import Queue
|
|
|
|
|
from rq.utils import utcformat
|
|
|
|
|
|
|
|
|
@ -327,3 +328,21 @@ class TestJob(RQTestCase):
|
|
|
|
|
job.cancel()
|
|
|
|
|
self.assertFalse(self.testconn.exists(job.key))
|
|
|
|
|
self.assertFalse(self.testconn.exists(job.dependents_key))
|
|
|
|
|
|
|
|
|
|
def test_get_and_set_job_status(self):
|
|
|
|
|
"""job.get_status and set_status have same functionality as property"""
|
|
|
|
|
job = Job.create(func=say_hello)
|
|
|
|
|
job2 = Job.create(func=say_hello)
|
|
|
|
|
|
|
|
|
|
with warnings.catch_warnings(record=True) as w:
|
|
|
|
|
warnings.simplefilter("always")
|
|
|
|
|
job.set_status(Status.FINISHED)
|
|
|
|
|
job2.status = Status.FINISHED
|
|
|
|
|
self.assertEqual(len(w), 1)
|
|
|
|
|
self.assertTrue(issubclass(w[-1].category, DeprecationWarning))
|
|
|
|
|
|
|
|
|
|
with warnings.catch_warnings(record=True) as w:
|
|
|
|
|
warnings.simplefilter("always")
|
|
|
|
|
self.assertEqual(job.get_status(), job2.status)
|
|
|
|
|
self.assertEqual(len(w), 1)
|
|
|
|
|
self.assertTrue(issubclass(w[-1].category, DeprecationWarning))
|
|
|
|
|