diff --git a/rq/job.py b/rq/job.py index 775abb2..a50d24a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -69,7 +69,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None, depends_on=None): + result_ttl=None, status=None, description=None, depends_on=None, timeout=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -91,6 +91,7 @@ class Job(object): job._kwargs = kwargs job.description = description or job.get_call_string() job.result_ttl = result_ttl + job.timeout = timeout job._status = status # dependency could be job instance or id if depends_on is not None: diff --git a/rq/queue.py b/rq/queue.py index 2cf2ccd..14c4a5e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -22,6 +22,7 @@ def compact(lst): @total_ordering class Queue(object): + DEFAULT_TIMEOUT = 180 # Default timeout seconds. redis_queue_namespace_prefix = 'rq:queue:' redis_queues_keys = 'rq:queues' @@ -153,7 +154,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, - description=description, depends_on=depends_on) + description=description, depends_on=depends_on, timeout=timeout) # If job depends on an unfinished job, register itself on it's # parent's dependents instead of enqueueing it. @@ -172,7 +173,7 @@ class Queue(object): except WatchError: continue - return self.enqueue_job(job, timeout=timeout) + return self.enqueue_job(job) def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues @@ -211,13 +212,9 @@ class Queue(object): timeout=timeout, result_ttl=result_ttl, description=description, depends_on=depends_on) - def enqueue_job(self, job, timeout=None, set_meta_data=True): + def enqueue_job(self, job, set_meta_data=True): """Enqueues a job for delayed execution. - When the `timeout` argument is sent, it will overrides the default - timeout value of 180 seconds. `timeout` may either be a string or - integer. - If the `set_meta_data` argument is `True` (default), it will update the properties `origin` and `enqueued_at`. @@ -230,10 +227,8 @@ class Queue(object): job.origin = self.name job.enqueued_at = times.now() - if timeout: - job.timeout = timeout # _timeout_in_seconds(timeout) - else: - job.timeout = 180 # default + if job.timeout is None: + job.timeout = self.DEFAULT_TIMEOUT job.save() if self._async: @@ -379,7 +374,7 @@ class FailedQueue(Queue): """ job.ended_at = times.now() job.exc_info = exc_info - return self.enqueue_job(job, timeout=job.timeout, set_meta_data=False) + return self.enqueue_job(job, set_meta_data=False) def requeue(self, job_id): """Requeues the job with the given job ID.""" @@ -397,4 +392,4 @@ class FailedQueue(Queue): job.status = Status.QUEUED job.exc_info = None q = Queue(job.origin, connection=self.connection) - q.enqueue_job(job, timeout=job.timeout) + q.enqueue_job(job) diff --git a/rq/worker.py b/rq/worker.py index ddd71ab..09af23c 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -323,7 +323,7 @@ class Worker(object): self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) - self.connection.expire(self.key, (job.timeout or 180) + 60) + self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60) self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) if job.status == 'finished': @@ -404,7 +404,7 @@ class Worker(object): job.origin, time.time())) try: - with death_penalty_after(job.timeout or 180): + with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT): rv = job.perform() # Pickle the result in the same try-except block since we need to diff --git a/tests/test_queue.py b/tests/test_queue.py index 716c3dc..0f0ea26 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -308,7 +308,7 @@ class TestQueue(RQTestCase): self.assertFalse(self.testconn.exists(parent_job.dependents_key)) def test_enqueue_job_with_dependency(self): - """Jobs are enqueued only when their dependencies are finished""" + """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) q = Queue() @@ -320,6 +320,23 @@ class TestQueue(RQTestCase): parent_job.save() job = q.enqueue_call(say_hello, depends_on=parent_job) self.assertEqual(q.job_ids, [job.id]) + self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) + + def test_enqueue_job_with_dependency_and_timeout(self): + """Jobs still know their specified timeout after being scheduled as a dependency.""" + # Job with unfinished dependency is not immediately enqueued + parent_job = Job.create(func=say_hello) + q = Queue() + job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) + self.assertEqual(q.job_ids, []) + self.assertEqual(job.timeout, 123) + + # Jobs dependent on finished jobs are immediately enqueued + parent_job.status = 'finished' + parent_job.save() + job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) + self.assertEqual(q.job_ids, [job.id]) + self.assertEqual(job.timeout, 123) class TestFailedQueue(RQTestCase):