diff --git a/rq/job.py b/rq/job.py index a10c340..8c80ba6 100644 --- a/rq/job.py +++ b/rq/job.py @@ -69,7 +69,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None, dependency=None): + result_ttl=None, status=None, description=None, dependency=None, timeout=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -91,6 +91,7 @@ class Job(object): job._kwargs = kwargs job.description = description or job.get_call_string() job.result_ttl = result_ttl + job.timeout = timeout job._status = status # dependency could be job instance or id if dependency is not None: diff --git a/rq/queue.py b/rq/queue.py index dcbeeba..94311af 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -22,6 +22,7 @@ def compact(lst): @total_ordering class Queue(object): + DEFAULT_TIMEOUT = 180 # Default timeout seconds. redis_queue_namespace_prefix = 'rq:queue:' redis_queues_keys = 'rq:queues' @@ -153,7 +154,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, - description=description, dependency=after) + description=description, dependency=after, timeout=timeout) # If job depends on an unfinished job, register itself on it's # parent's waitlist instead of enqueueing it. @@ -172,7 +173,7 @@ class Queue(object): except WatchError: continue - return self.enqueue_job(job, timeout=timeout) + return self.enqueue_job(job) def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues @@ -211,13 +212,9 @@ class Queue(object): timeout=timeout, result_ttl=result_ttl, description=description, after=after) - def enqueue_job(self, job, timeout=None, set_meta_data=True): + def enqueue_job(self, job, set_meta_data=True): """Enqueues a job for delayed execution. - When the `timeout` argument is sent, it will overrides the default - timeout value of 180 seconds. `timeout` may either be a string or - integer. - If the `set_meta_data` argument is `True` (default), it will update the properties `origin` and `enqueued_at`. @@ -230,10 +227,8 @@ class Queue(object): job.origin = self.name job.enqueued_at = times.now() - if timeout: - job.timeout = timeout # _timeout_in_seconds(timeout) - else: - job.timeout = 180 # default + if job.timeout is None: + job.timeout = self.DEFAULT_TIMEOUT job.save() if self._async: @@ -379,7 +374,7 @@ class FailedQueue(Queue): """ job.ended_at = times.now() job.exc_info = exc_info - return self.enqueue_job(job, timeout=job.timeout, set_meta_data=False) + return self.enqueue_job(job, set_meta_data=False) def requeue(self, job_id): """Requeues the job with the given job ID.""" @@ -397,4 +392,4 @@ class FailedQueue(Queue): job.status = Status.QUEUED job.exc_info = None q = Queue(job.origin, connection=self.connection) - q.enqueue_job(job, timeout=job.timeout) + q.enqueue_job(job) diff --git a/rq/worker.py b/rq/worker.py index e8b9e69..0e9c5af 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -323,7 +323,7 @@ class Worker(object): self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) - self.connection.expire(self.key, (job.timeout or 180) + 60) + self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60) self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) if job.status == 'finished': @@ -404,7 +404,7 @@ class Worker(object): job.origin, time.time())) try: - with death_penalty_after(job.timeout or 180): + with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT): rv = job.perform() # Pickle the result in the same try-except block since we need to diff --git a/tests/test_queue.py b/tests/test_queue.py index e236f0c..cb3193d 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -307,19 +307,24 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, [job_1.id, job_2.id]) self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) - def test_enqueue_job_with_dependency(self): + def test_enqueue_job_with_dependency(self, timeout=None): """Jobs are enqueued only when their dependencies are finished""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) q = Queue() - q.enqueue_call(say_hello, after=parent_job) + job = q.enqueue_call(say_hello, after=parent_job, timeout=timeout) self.assertEqual(q.job_ids, []) + self.assertEqual(job.timeout, timeout) # Jobs dependent on finished jobs are immediately enqueued parent_job.status = 'finished' parent_job.save() - job = q.enqueue_call(say_hello, after=parent_job) + job = q.enqueue_call(say_hello, after=parent_job, timeout=timeout) self.assertEqual(q.job_ids, [job.id]) + self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT if timeout is None else timeout) + + def test_enqueue_job_with_dependency_and_timeout(self): + self.test_enqueue_job_with_dependency(123) class TestFailedQueue(RQTestCase):