Job returning None as result are now persisted correctly.

Job status can now be checked via ``status`` property which should
return either "queued", "finished" or "failed".
main
Selwin Ong 13 years ago
parent d7f83cd727
commit 442b389b97

@ -1,6 +1,7 @@
import importlib import importlib
import inspect import inspect
import times import times
from collections import namedtuple
from uuid import uuid4 from uuid import uuid4
from cPickle import loads, dumps, UnpicklingError from cPickle import loads, dumps, UnpicklingError
from .connections import get_current_connection from .connections import get_current_connection
@ -8,9 +9,11 @@ from .exceptions import UnpickleError, NoSuchJobError
JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args',
'created_at', 'enqueued_at', 'connection', '_result', 'created_at', 'enqueued_at', 'connection', '_result', 'result',
'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance',
'result_ttl']) 'result_ttl', '_status', 'status'])
Status = namedtuple('Status', ('queued', 'finished', 'failed'))
STATUS = Status(queued='queued', finished='finished', failed='failed')
def unpickle(pickled_string): def unpickle(pickled_string):
@ -48,11 +51,12 @@ def requeue_job(job_id, connection=None):
class Job(object): class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data. """A Job is just a convenient datastructure to pass around job (meta) data.
""" """
STATUS = STATUS
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None): result_ttl=None, status=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -74,12 +78,17 @@ class Job(object):
job._kwargs = kwargs job._kwargs = kwargs
job.description = job.get_call_string() job.description = job.get_call_string()
job.result_ttl = result_ttl job.result_ttl = result_ttl
job._status = status
return job return job
@property @property
def func_name(self): def func_name(self):
return self._func_name return self._func_name
@property
def status(self):
return self._status
@property @property
def func(self): def func(self):
func_name = self.func_name func_name = self.func_name
@ -138,6 +147,7 @@ class Job(object):
self.exc_info = None self.exc_info = None
self.timeout = None self.timeout = None
self.result_ttl = None self.result_ttl = None
self._status = None
# Data access # Data access
@ -227,6 +237,7 @@ class Job(object):
self.exc_info = obj.get('exc_info') self.exc_info = obj.get('exc_info')
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = obj.get('status') if obj.get('status') else None # noqa
# Overwrite job's additional attrs (those not in JOB_ATTRS), if any # Overwrite job's additional attrs (those not in JOB_ATTRS), if any
additional_attrs = set(obj.keys()).difference(JOB_ATTRS) additional_attrs = set(obj.keys()).difference(JOB_ATTRS)
@ -258,6 +269,8 @@ class Job(object):
obj['timeout'] = self.timeout obj['timeout'] = self.timeout
if self.result_ttl is not None: if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl obj['result_ttl'] = self.result_ttl
if self._status is not None:
obj['status'] = self._status
""" """
Store additional attributes from job instance into Redis. This is done Store additional attributes from job instance into Redis. This is done
so that third party libraries using RQ can store additional data so that third party libraries using RQ can store additional data

@ -115,7 +115,8 @@ class Queue(object):
contain options for RQ itself. contain options for RQ itself.
""" """
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl) job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Job.STATUS.queued)
return self.enqueue_job(job, timeout=timeout) return self.enqueue_job(job, timeout=timeout)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):

@ -384,10 +384,12 @@ class Worker(object):
# Pickle the result in the same try-except block since we need to # Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails # use the same exc handling when pickling fails
pickled_rv = dumps(rv) pickled_rv = dumps(rv)
job._status = job.STATUS.finished
except Exception as e: except Exception as e:
fq = self.failed_queue fq = self.failed_queue
self.log.exception(red(str(e))) self.log.exception(red(str(e)))
self.log.warning('Moving job to %s queue.' % fq.name) self.log.warning('Moving job to %s queue.' % fq.name)
job._status = job.STATUS.failed
fq.quarantine(job, exc_info=traceback.format_exc()) fq.quarantine(job, exc_info=traceback.format_exc())
return False return False
@ -397,23 +399,21 @@ class Worker(object):
else: else:
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))
# Expire results """
has_result = rv is not None How long we persist the job result depends on the value of result_ttl:
explicit_ttl_requested = job.result_ttl is not None - If result_ttl is 0, cleanup the job immediately.
should_expire = has_result or explicit_ttl_requested - If it's a positive number, set the job to expire in X seconds.
if should_expire: - If result_ttl is negative, don't set an expiry to it (persist forever)
"""
result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl
if result_ttl == 0:
job.delete()
else:
p = self.connection.pipeline() p = self.connection.pipeline()
p.hset(job.key, 'result', pickled_rv) p.hset(job.key, 'result', pickled_rv)
p.hset(job.key, 'status', job._status)
if explicit_ttl_requested: if result_ttl > 0:
ttl = job.result_ttl p.expire(job.key, result_ttl)
else:
ttl = self.default_result_ttl
if ttl >= 0:
p.expire(job.key, ttl)
p.execute() p.execute()
else:
# Cleanup immediately
job.delete()
return True return True

@ -205,6 +205,12 @@ class TestQueue(RQTestCase):
None) None)
self.assertEquals(q.count, 0) self.assertEquals(q.count, 0)
def test_enqueue_sets_status(self):
"""Enqueueing a job sets its status to "queued"."""
q = Queue()
job = q.enqueue(say_hello)
self.assertEqual(job.status, Job.STATUS.queued)
class TestFailedQueue(RQTestCase): class TestFailedQueue(RQTestCase):
def test_requeue_job(self): def test_requeue_job(self):

@ -123,29 +123,6 @@ class TestWorker(RQTestCase):
# Should not have created evidence of execution # Should not have created evidence of execution
self.assertEquals(os.path.exists(SENTINEL_FILE), False) self.assertEquals(os.path.exists(SENTINEL_FILE), False)
def test_cleaning_up_of_jobs(self):
"""Jobs get cleaned up after successful execution."""
q = Queue()
job_with_rv = q.enqueue(say_hello, 'Franklin')
job_without_rv = q.enqueue(do_nothing)
# Job hashes exists
self.assertEquals(self.testconn.type(job_with_rv.key), 'hash')
self.assertEquals(self.testconn.type(job_without_rv.key), 'hash')
# Execute the job
w = Worker([q])
w.work(burst=True)
# First, assert that the job executed successfully
assert self.testconn.hget(job_with_rv.key, 'exc_info') is None
assert self.testconn.hget(job_without_rv.key, 'exc_info') is None
# Jobs with results expire after a certain TTL, while jobs without
# results are immediately removed
assert self.testconn.ttl(job_with_rv.key) > 0
assert not self.testconn.exists(job_without_rv.key)
@slow # noqa @slow # noqa
def test_timeouts(self): def test_timeouts(self):
"""Worker kills jobs after timeout.""" """Worker kills jobs after timeout."""
@ -187,3 +164,25 @@ class TestWorker(RQTestCase):
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)
self.assertEqual(self.testconn.ttl(job.key), None) self.assertEqual(self.testconn.ttl(job.key), None)
# Job with result_ttl = 0 gets deleted immediately
job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)
w = Worker([q])
w.work(burst=True)
self.assertEqual(self.testconn.get(job.key), None)
def test_worker_sets_job_status(self):
"""Ensure that worker correctly sets job status."""
q = Queue()
w = Worker([q])
job = q.enqueue(say_hello)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.status, job.STATUS.finished)
# Failed jobs should set status to "failed"
job = q.enqueue(div_by_zero, args=(1,))
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.status, job.STATUS.failed)
Loading…
Cancel
Save