diff --git a/rq/worker.py b/rq/worker.py index 4410ce6..a05a6ba 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -694,18 +694,20 @@ class Worker(object): signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_DFL) - def prepare_job_execution(self, job): + def prepare_job_execution(self, job, heartbeat_ttl=None): """Performs misc bookkeeping like updating states prior to job execution. """ timeout = (job.timeout or 180) + 60 + if heartbeat_ttl is None: + heartbeat_ttl = self.job_monitoring_interval + 5 + with self.connection._pipeline() as pipeline: self.set_state(WorkerStatus.BUSY, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) - self.heartbeat(self.job_monitoring_interval + 5, pipeline=pipeline) - registry = StartedJobRegistry(job.origin, - self.connection, + self.heartbeat(heartbeat_ttl, pipeline=pipeline) + registry = StartedJobRegistry(job.origin, self.connection, job_class=self.job_class) registry.add(job, timeout, pipeline=pipeline) job.set_status(JobStatus.STARTED, pipeline=pipeline) @@ -778,11 +780,11 @@ class Worker(object): except WatchError: continue - def perform_job(self, job, queue): + def perform_job(self, job, queue, heartbeat_ttl=None): """Performs the actual work of a job. Will/should only be called inside the work horse's process. """ - self.prepare_job_execution(job) + self.prepare_job_execution(job, heartbeat_ttl) push_connection(self.connection) @@ -910,9 +912,10 @@ class SimpleWorker(Worker): def main_work_horse(self, *args, **kwargs): raise NotImplementedError("Test worker does not implement this method") - def execute_job(self, *args, **kwargs): + def execute_job(self, job, queue): """Execute job in same thread/process, do not fork()""" - return self.perform_job(*args, **kwargs) + timeout = (job.timeout or DEFAULT_WORKER_TTL) + 5 + return self.perform_job(job, queue, heartbeat_ttl=timeout) class HerokuWorker(Worker): diff --git a/tests/fixtures.py b/tests/fixtures.py index 4ef904a..b497b6c 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -119,6 +119,14 @@ def black_hole(job, *exc_info): return False +def save_key_ttl(key): + # Stores key ttl in meta + job = get_current_job() + ttl = job.connection.ttl(key) + job.meta = {'ttl': ttl} + job.save_meta() + + def long_running_job(timeout=10): time.sleep(timeout) return 'Done sleeping...' diff --git a/tests/test_worker.py b/tests/test_worker.py index d9c88b6..868ef8d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -21,11 +21,11 @@ import mock from mock import Mock from tests import RQTestCase, slow -from tests.fixtures import (create_file, create_file_after_timeout, - div_by_zero, do_nothing, say_hello, say_pid, - run_dummy_heroku_worker, access_self, - modify_self, modify_self_and_error, - long_running_job) +from tests.fixtures import ( + create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, + say_pid, run_dummy_heroku_worker, access_self, modify_self, + modify_self_and_error, long_running_job, save_key_ttl +) from rq import (get_failed_queue, Queue, SimpleWorker, Worker, get_current_connection) @@ -537,6 +537,17 @@ class TestWorker(RQTestCase): self.assertEqual(job.result, os.getpid(), 'PID mismatch, fork() is not supposed to happen here') + def test_simpleworker_heartbeat_ttl(self): + """SimpleWorker's key must last longer than job.timeout when working""" + queue = Queue('foo') + + worker = SimpleWorker([queue]) + job_timeout = 300 + job = queue.enqueue(save_key_ttl, worker.key, timeout=job_timeout) + worker.work(burst=True) + job.refresh() + self.assertGreater(job.meta['ttl'], job_timeout) + def test_prepare_job_execution(self): """Prepare job execution does the necessary bookkeeping.""" queue = Queue(connection=self.testconn)