Merge branch 'selwin-master'

main
Vincent Driessen 12 years ago
commit 9633d36e85

@ -287,9 +287,10 @@ class Job(object):
self._status = obj.get('status') if obj.get('status') else None self._status = obj.get('status') if obj.get('status') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
def save(self): def save(self, pipeline=None):
"""Persists the current job instance to its corresponding Redis key.""" """Persists the current job instance to its corresponding Redis key."""
key = self.key key = self.key
connection = pipeline if pipeline is not None else self.connection
obj = {} obj = {}
obj['created_at'] = times.format(self.created_at or times.now(), 'UTC') obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')
@ -317,7 +318,7 @@ class Job(object):
if self.meta: if self.meta:
obj['meta'] = dumps(self.meta) obj['meta'] = dumps(self.meta)
self.connection.hmset(key, obj) connection.hmset(key, obj)
def cancel(self): def cancel(self):
"""Cancels the given job, which will prevent the job from ever being """Cancels the given job, which will prevent the job from ever being
@ -346,6 +347,13 @@ class Job(object):
return self._result return self._result
def get_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a job and its result
will be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.result_ttl is None else self.result_ttl
# Representation # Representation
def get_call_string(self): # noqa def get_call_string(self): # noqa
"""Returns a string representation of the call, formatted as a regular """Returns a string representation of the call, formatted as a regular
@ -359,6 +367,22 @@ class Job(object):
args = ', '.join(arg_list) args = ', '.join(arg_list)
return '%s(%s)' % (self.func_name, args) return '%s(%s)' % (self.func_name, args)
def cleanup(self, ttl=None, pipeline=None):
"""Prepare job for eventual deletion (if needed). This method is usually
called after successful execution. How long we persist the job and its
result depends on the value of result_ttl:
- If result_ttl is 0, cleanup the job immediately.
- If it's a positive number, set the job to expire in X seconds.
- If result_ttl is negative, don't set an expiry to it (persist
forever)
"""
if ttl == 0:
self.cancel()
elif ttl > 0:
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl)
def __str__(self): def __str__(self):
return '<Job %s: %s>' % (self.id, self.description) return '<Job %s: %s>' % (self.id, self.description)

@ -13,7 +13,6 @@ import socket
import signal import signal
import traceback import traceback
import logging import logging
from cPickle import dumps
from .queue import Queue, get_failed_queue from .queue import Queue, get_failed_queue
from .connections import get_current_connection from .connections import get_current_connection
from .job import Job, Status from .job import Job, Status
@ -199,7 +198,7 @@ class Worker(object):
key = self.key key = self.key
now = time.time() now = time.time()
queues = ','.join(self.queue_names()) queues = ','.join(self.queue_names())
with self.connection.pipeline() as p: with self.connection._pipeline() as p:
p.delete(key) p.delete(key)
p.hset(key, 'birth', now) p.hset(key, 'birth', now)
p.hset(key, 'queues', queues) p.hset(key, 'queues', queues)
@ -210,7 +209,7 @@ class Worker(object):
def register_death(self): def register_death(self):
"""Registers its own death.""" """Registers its own death."""
self.log.debug('Registering death') self.log.debug('Registering death')
with self.connection.pipeline() as p: with self.connection._pipeline() as p:
# We cannot use self.state = 'dead' here, because that would # We cannot use self.state = 'dead' here, because that would
# rollback the pipeline # rollback the pipeline
p.srem(self.redis_workers_keys, self.key) p.srem(self.redis_workers_keys, self.key)
@ -412,9 +411,17 @@ class Worker(object):
# Pickle the result in the same try-except block since we need to # Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails # use the same exc handling when pickling fails
pickled_rv = dumps(rv) job._result = rv
job._status = Status.FINISHED job._status = Status.FINISHED
job.ended_at = times.now() job.ended_at = times.now()
result_ttl = job.get_ttl(self.default_result_ttl)
pipeline = self.connection._pipeline()
if result_ttl != 0:
job.save(pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline)
pipeline.execute()
except: except:
# Use the public setter here, to immediately update Redis # Use the public setter here, to immediately update Redis
job.status = Status.FAILED job.status = Status.FAILED
@ -426,27 +433,12 @@ class Worker(object):
else: else:
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))
# How long we persist the job result depends on the value of
# result_ttl:
# - If result_ttl is 0, cleanup the job immediately.
# - If it's a positive number, set the job to expire in X seconds.
# - If result_ttl is negative, don't set an expiry to it (persist
# forever)
result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl # noqa
if result_ttl == 0: if result_ttl == 0:
job.delete()
self.log.info('Result discarded immediately.') self.log.info('Result discarded immediately.')
else: elif result_ttl > 0:
p = self.connection.pipeline()
p.hset(job.key, 'result', pickled_rv)
p.hset(job.key, 'status', job._status)
p.hset(job.key, 'ended_at', times.format(job.ended_at, 'UTC'))
if result_ttl > 0:
p.expire(job.key, result_ttl)
self.log.info('Result is kept for %d seconds.' % result_ttl) self.log.info('Result is kept for %d seconds.' % result_ttl)
else: else:
self.log.warning('Result will never expire, clean up result key manually.') self.log.warning('Result will never expire, clean up result key manually.')
p.execute()
return True return True

@ -219,3 +219,33 @@ class TestJob(RQTestCase):
id = job.perform() id = job.perform()
self.assertEqual(job.id, id) self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self) self.assertEqual(job.func, access_self)
def test_get_ttl(self):
"""Getting job TTL."""
job_ttl = 1
default_ttl = 2
job = Job.create(func=say_hello, result_ttl=job_ttl)
job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl)
self.assertEqual(job.get_ttl(), job_ttl)
job = Job.create(func=say_hello)
job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_ttl(), None)
def test_cleanup(self):
"""Test that jobs and results are expired properly."""
job = Job.create(func=say_hello)
job.save()
# Jobs with negative TTLs don't expire
job.cleanup(ttl=-1)
self.assertEqual(self.testconn.ttl(job.key), -1)
# Jobs with positive TTLs are eventually deleted
job.cleanup(ttl=100)
self.assertEqual(self.testconn.ttl(job.key), 100)
# Jobs with 0 TTL are immediately deleted
job.cleanup(ttl=0)
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)

Loading…
Cancel
Save