Set timeout at Job creation instead of during enqueueing; made Queue.enqueue_call respect `timeout` when `after` is specified

main
Joshua Chia 11 years ago
parent fa0e9e0f95
commit b89d6c8990

@ -69,7 +69,7 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None, dependency=None): result_ttl=None, status=None, description=None, dependency=None, timeout=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -91,6 +91,7 @@ class Job(object):
job._kwargs = kwargs job._kwargs = kwargs
job.description = description or job.get_call_string() job.description = description or job.get_call_string()
job.result_ttl = result_ttl job.result_ttl = result_ttl
job.timeout = timeout
job._status = status job._status = status
# dependency could be job instance or id # dependency could be job instance or id
if dependency is not None: if dependency is not None:

@ -22,6 +22,7 @@ def compact(lst):
@total_ordering @total_ordering
class Queue(object): class Queue(object):
DEFAULT_TIMEOUT = 180 # Default timeout seconds.
redis_queue_namespace_prefix = 'rq:queue:' redis_queue_namespace_prefix = 'rq:queue:'
redis_queues_keys = 'rq:queues' redis_queues_keys = 'rq:queues'
@ -153,7 +154,7 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status # TODO: job with dependency shouldn't have "queued" as status
job = Job.create(func, args, kwargs, connection=self.connection, job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, status=Status.QUEUED,
description=description, dependency=after) description=description, dependency=after, timeout=timeout)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's waitlist instead of enqueueing it. # parent's waitlist instead of enqueueing it.
@ -172,7 +173,7 @@ class Queue(object):
except WatchError: except WatchError:
continue continue
return self.enqueue_job(job, timeout=timeout) return self.enqueue_job(job)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
@ -211,13 +212,9 @@ class Queue(object):
timeout=timeout, result_ttl=result_ttl, timeout=timeout, result_ttl=result_ttl,
description=description, after=after) description=description, after=after)
def enqueue_job(self, job, timeout=None, set_meta_data=True): def enqueue_job(self, job, set_meta_data=True):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
When the `timeout` argument is sent, it will overrides the default
timeout value of 180 seconds. `timeout` may either be a string or
integer.
If the `set_meta_data` argument is `True` (default), it will update If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`. the properties `origin` and `enqueued_at`.
@ -230,10 +227,8 @@ class Queue(object):
job.origin = self.name job.origin = self.name
job.enqueued_at = times.now() job.enqueued_at = times.now()
if timeout: if job.timeout is None:
job.timeout = timeout # _timeout_in_seconds(timeout) job.timeout = self.DEFAULT_TIMEOUT
else:
job.timeout = 180 # default
job.save() job.save()
if self._async: if self._async:
@ -379,7 +374,7 @@ class FailedQueue(Queue):
""" """
job.ended_at = times.now() job.ended_at = times.now()
job.exc_info = exc_info job.exc_info = exc_info
return self.enqueue_job(job, timeout=job.timeout, set_meta_data=False) return self.enqueue_job(job, set_meta_data=False)
def requeue(self, job_id): def requeue(self, job_id):
"""Requeues the job with the given job ID.""" """Requeues the job with the given job ID."""
@ -397,4 +392,4 @@ class FailedQueue(Queue):
job.status = Status.QUEUED job.status = Status.QUEUED
job.exc_info = None job.exc_info = None
q = Queue(job.origin, connection=self.connection) q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job, timeout=job.timeout) q.enqueue_job(job)

@ -323,7 +323,7 @@ class Worker(object):
self.log.info('%s: %s (%s)' % (green(queue.name), self.log.info('%s: %s (%s)' % (green(queue.name),
blue(job.description), job.id)) blue(job.description), job.id))
self.connection.expire(self.key, (job.timeout or 180) + 60) self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60)
self.fork_and_perform_job(job) self.fork_and_perform_job(job)
self.connection.expire(self.key, self.default_worker_ttl) self.connection.expire(self.key, self.default_worker_ttl)
if job.status == 'finished': if job.status == 'finished':
@ -404,7 +404,7 @@ class Worker(object):
job.origin, time.time())) job.origin, time.time()))
try: try:
with death_penalty_after(job.timeout or 180): with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT):
rv = job.perform() rv = job.perform()
# Pickle the result in the same try-except block since we need to # Pickle the result in the same try-except block since we need to

@ -307,19 +307,24 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [job_1.id, job_2.id]) self.assertEqual(q.job_ids, [job_1.id, job_2.id])
self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) self.assertFalse(self.testconn.exists(parent_job.waitlist_key))
def test_enqueue_job_with_dependency(self): def test_enqueue_job_with_dependency(self, timeout=None):
"""Jobs are enqueued only when their dependencies are finished""" """Jobs are enqueued only when their dependencies are finished"""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()
q.enqueue_call(say_hello, after=parent_job) job = q.enqueue_call(say_hello, after=parent_job, timeout=timeout)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
self.assertEqual(job.timeout, timeout)
# Jobs dependent on finished jobs are immediately enqueued # Jobs dependent on finished jobs are immediately enqueued
parent_job.status = 'finished' parent_job.status = 'finished'
parent_job.save() parent_job.save()
job = q.enqueue_call(say_hello, after=parent_job) job = q.enqueue_call(say_hello, after=parent_job, timeout=timeout)
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT if timeout is None else timeout)
def test_enqueue_job_with_dependency_and_timeout(self):
self.test_enqueue_job_with_dependency(123)
class TestFailedQueue(RQTestCase): class TestFailedQueue(RQTestCase):

Loading…
Cancel
Save