diff --git a/rq/job.py b/rq/job.py index bf55661..5681e66 100644 --- a/rq/job.py +++ b/rq/job.py @@ -12,8 +12,12 @@ JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', 'created_at', 'enqueued_at', 'connection', '_result', 'result', 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', 'result_ttl', '_status', 'status']) -Status = namedtuple('Status', ('queued', 'finished', 'failed')) -STATUS = Status(queued='queued', finished='finished', failed='failed') + +def enum(name, *sequential, **named): + values = dict(zip(sequential, range(len(sequential))), **named) + return type(name, (), values) + +Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed') def unpickle(pickled_string): @@ -51,8 +55,6 @@ def requeue_job(job_id, connection=None): class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ - STATUS = STATUS - # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, diff --git a/rq/queue.py b/rq/queue.py index f9cfff0..6dca8e1 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,6 +1,6 @@ import times from .connections import resolve_connection -from .job import Job +from .job import Job, Status from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .compat import total_ordering @@ -116,7 +116,7 @@ class Queue(object): """ timeout = timeout or self._default_timeout job = Job.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=Job.STATUS.queued) + result_ttl=result_ttl, status=Status.QUEUED) return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): diff --git a/rq/worker.py b/rq/worker.py index 4ff7888..3ba1d6c 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -14,6 +14,7 @@ import logging from cPickle import dumps from .queue import Queue, get_failed_queue from .connections import get_current_connection +from .job import Status from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError from .timeouts import death_penalty_after @@ -384,12 +385,12 @@ class Worker(object): # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails pickled_rv = dumps(rv) - job._status = job.STATUS.finished + job._status = Status.FINISHED except Exception as e: fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name) - job._status = job.STATUS.failed + job._status = Status.FAILED fq.quarantine(job, exc_info=traceback.format_exc()) return False @@ -403,9 +404,9 @@ class Worker(object): How long we persist the job result depends on the value of result_ttl: - If result_ttl is 0, cleanup the job immediately. - If it's a positive number, set the job to expire in X seconds. - - If result_ttl is negative, don't set an expiry to it (persist forever) + - If result_ttl is negative, don't set an expiry to it (persist forever) """ - result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl + result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl if result_ttl == 0: job.delete() else: diff --git a/tests/test_queue.py b/tests/test_queue.py index 3d455c2..4a88f65 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,7 +1,7 @@ from tests import RQTestCase from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation from rq import Queue, get_failed_queue -from rq.job import Job +from rq.job import Job, Status from rq.exceptions import InvalidJobOperationError @@ -209,7 +209,7 @@ class TestQueue(RQTestCase): """Enqueueing a job sets its status to "queued".""" q = Queue() job = q.enqueue(say_hello) - self.assertEqual(job.status, Job.STATUS.queued) + self.assertEqual(job.status, Status.QUEUED) class TestFailedQueue(RQTestCase): @@ -271,4 +271,4 @@ class TestFailedQueue(RQTestCase): """Executes a job immediately if async=False.""" q = Queue(async=False) job = q.enqueue(some_calculation, args=(2, 3)) - self.assertEqual(job.return_value, 6) \ No newline at end of file + self.assertEqual(job.return_value, 6) diff --git a/tests/test_worker.py b/tests/test_worker.py index 91d1589..dfe5261 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,7 +4,7 @@ from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \ create_file_after_timeout from tests.helpers import strip_milliseconds from rq import Queue, Worker, get_failed_queue -from rq.job import Job +from rq.job import Job, Status class TestWorker(RQTestCase): @@ -175,14 +175,14 @@ class TestWorker(RQTestCase): """Ensure that worker correctly sets job status.""" q = Queue() w = Worker([q]) - + job = q.enqueue(say_hello) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.status, job.STATUS.finished) - + self.assertEqual(job.status, Status.FINISHED) + # Failed jobs should set status to "failed" job = q.enqueue(div_by_zero, args=(1,)) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.status, job.STATUS.failed) \ No newline at end of file + self.assertEqual(job.status, Status.FAILED)