I like this implementation of an 'enum' better.

main
Vincent Driessen 13 years ago
parent 442b389b97
commit 4224304291

@ -12,8 +12,12 @@ JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args',
'created_at', 'enqueued_at', 'connection', '_result', 'result', 'created_at', 'enqueued_at', 'connection', '_result', 'result',
'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance',
'result_ttl', '_status', 'status']) 'result_ttl', '_status', 'status'])
Status = namedtuple('Status', ('queued', 'finished', 'failed'))
STATUS = Status(queued='queued', finished='finished', failed='failed') def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)
return type(name, (), values)
Status = enum('Status', QUEUED='queued', FINISHED='finished', FAILED='failed')
def unpickle(pickled_string): def unpickle(pickled_string):
@ -51,8 +55,6 @@ def requeue_job(job_id, connection=None):
class Job(object): class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data. """A Job is just a convenient datastructure to pass around job (meta) data.
""" """
STATUS = STATUS
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,

@ -1,6 +1,6 @@
import times import times
from .connections import resolve_connection from .connections import resolve_connection
from .job import Job from .job import Job, Status
from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError
from .compat import total_ordering from .compat import total_ordering
@ -116,7 +116,7 @@ class Queue(object):
""" """
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
job = Job.create(func, args, kwargs, connection=self.connection, job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Job.STATUS.queued) result_ttl=result_ttl, status=Status.QUEUED)
return self.enqueue_job(job, timeout=timeout) return self.enqueue_job(job, timeout=timeout)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):

@ -14,6 +14,7 @@ import logging
from cPickle import dumps from cPickle import dumps
from .queue import Queue, get_failed_queue from .queue import Queue, get_failed_queue
from .connections import get_current_connection from .connections import get_current_connection
from .job import Status
from .utils import make_colorizer from .utils import make_colorizer
from .exceptions import NoQueueError, UnpickleError from .exceptions import NoQueueError, UnpickleError
from .timeouts import death_penalty_after from .timeouts import death_penalty_after
@ -384,12 +385,12 @@ class Worker(object):
# Pickle the result in the same try-except block since we need to # Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails # use the same exc handling when pickling fails
pickled_rv = dumps(rv) pickled_rv = dumps(rv)
job._status = job.STATUS.finished job._status = Status.FINISHED
except Exception as e: except Exception as e:
fq = self.failed_queue fq = self.failed_queue
self.log.exception(red(str(e))) self.log.exception(red(str(e)))
self.log.warning('Moving job to %s queue.' % fq.name) self.log.warning('Moving job to %s queue.' % fq.name)
job._status = job.STATUS.failed job._status = Status.FAILED
fq.quarantine(job, exc_info=traceback.format_exc()) fq.quarantine(job, exc_info=traceback.format_exc())
return False return False
@ -403,9 +404,9 @@ class Worker(object):
How long we persist the job result depends on the value of result_ttl: How long we persist the job result depends on the value of result_ttl:
- If result_ttl is 0, cleanup the job immediately. - If result_ttl is 0, cleanup the job immediately.
- If it's a positive number, set the job to expire in X seconds. - If it's a positive number, set the job to expire in X seconds.
- If result_ttl is negative, don't set an expiry to it (persist forever) - If result_ttl is negative, don't set an expiry to it (persist forever)
""" """
result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl
if result_ttl == 0: if result_ttl == 0:
job.delete() job.delete()
else: else:

@ -1,7 +1,7 @@
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation
from rq import Queue, get_failed_queue from rq import Queue, get_failed_queue
from rq.job import Job from rq.job import Job, Status
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
@ -209,7 +209,7 @@ class TestQueue(RQTestCase):
"""Enqueueing a job sets its status to "queued".""" """Enqueueing a job sets its status to "queued"."""
q = Queue() q = Queue()
job = q.enqueue(say_hello) job = q.enqueue(say_hello)
self.assertEqual(job.status, Job.STATUS.queued) self.assertEqual(job.status, Status.QUEUED)
class TestFailedQueue(RQTestCase): class TestFailedQueue(RQTestCase):
@ -271,4 +271,4 @@ class TestFailedQueue(RQTestCase):
"""Executes a job immediately if async=False.""" """Executes a job immediately if async=False."""
q = Queue(async=False) q = Queue(async=False)
job = q.enqueue(some_calculation, args=(2, 3)) job = q.enqueue(some_calculation, args=(2, 3))
self.assertEqual(job.return_value, 6) self.assertEqual(job.return_value, 6)

@ -4,7 +4,7 @@ from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \
create_file_after_timeout create_file_after_timeout
from tests.helpers import strip_milliseconds from tests.helpers import strip_milliseconds
from rq import Queue, Worker, get_failed_queue from rq import Queue, Worker, get_failed_queue
from rq.job import Job from rq.job import Job, Status
class TestWorker(RQTestCase): class TestWorker(RQTestCase):
@ -175,14 +175,14 @@ class TestWorker(RQTestCase):
"""Ensure that worker correctly sets job status.""" """Ensure that worker correctly sets job status."""
q = Queue() q = Queue()
w = Worker([q]) w = Worker([q])
job = q.enqueue(say_hello) job = q.enqueue(say_hello)
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEqual(job.status, job.STATUS.finished) self.assertEqual(job.status, Status.FINISHED)
# Failed jobs should set status to "failed" # Failed jobs should set status to "failed"
job = q.enqueue(div_by_zero, args=(1,)) job = q.enqueue(div_by_zero, args=(1,))
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEqual(job.status, job.STATUS.failed) self.assertEqual(job.status, Status.FAILED)

Loading…
Cancel
Save