SimpleWorker's ttl must always be longer than jobs. (#1002)

main
Selwin Ong 6 years ago committed by GitHub
parent 051104b0ff
commit 47d291771f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -694,18 +694,20 @@ class Worker(object):
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL)
def prepare_job_execution(self, job): def prepare_job_execution(self, job, heartbeat_ttl=None):
"""Performs misc bookkeeping like updating states prior to """Performs misc bookkeeping like updating states prior to
job execution. job execution.
""" """
timeout = (job.timeout or 180) + 60 timeout = (job.timeout or 180) + 60
if heartbeat_ttl is None:
heartbeat_ttl = self.job_monitoring_interval + 5
with self.connection._pipeline() as pipeline: with self.connection._pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline) self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(self.job_monitoring_interval + 5, pipeline=pipeline) self.heartbeat(heartbeat_ttl, pipeline=pipeline)
registry = StartedJobRegistry(job.origin, registry = StartedJobRegistry(job.origin, self.connection,
self.connection,
job_class=self.job_class) job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline) registry.add(job, timeout, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline) job.set_status(JobStatus.STARTED, pipeline=pipeline)
@ -778,11 +780,11 @@ class Worker(object):
except WatchError: except WatchError:
continue continue
def perform_job(self, job, queue): def perform_job(self, job, queue, heartbeat_ttl=None):
"""Performs the actual work of a job. Will/should only be called """Performs the actual work of a job. Will/should only be called
inside the work horse's process. inside the work horse's process.
""" """
self.prepare_job_execution(job) self.prepare_job_execution(job, heartbeat_ttl)
push_connection(self.connection) push_connection(self.connection)
@ -910,9 +912,10 @@ class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs): def main_work_horse(self, *args, **kwargs):
raise NotImplementedError("Test worker does not implement this method") raise NotImplementedError("Test worker does not implement this method")
def execute_job(self, *args, **kwargs): def execute_job(self, job, queue):
"""Execute job in same thread/process, do not fork()""" """Execute job in same thread/process, do not fork()"""
return self.perform_job(*args, **kwargs) timeout = (job.timeout or DEFAULT_WORKER_TTL) + 5
return self.perform_job(job, queue, heartbeat_ttl=timeout)
class HerokuWorker(Worker): class HerokuWorker(Worker):

@ -119,6 +119,14 @@ def black_hole(job, *exc_info):
return False return False
def save_key_ttl(key):
# Stores key ttl in meta
job = get_current_job()
ttl = job.connection.ttl(key)
job.meta = {'ttl': ttl}
job.save_meta()
def long_running_job(timeout=10): def long_running_job(timeout=10):
time.sleep(timeout) time.sleep(timeout)
return 'Done sleeping...' return 'Done sleeping...'

@ -21,11 +21,11 @@ import mock
from mock import Mock from mock import Mock
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, from tests.fixtures import (
div_by_zero, do_nothing, say_hello, say_pid, create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello,
run_dummy_heroku_worker, access_self, say_pid, run_dummy_heroku_worker, access_self, modify_self,
modify_self, modify_self_and_error, modify_self_and_error, long_running_job, save_key_ttl
long_running_job) )
from rq import (get_failed_queue, Queue, SimpleWorker, Worker, from rq import (get_failed_queue, Queue, SimpleWorker, Worker,
get_current_connection) get_current_connection)
@ -537,6 +537,17 @@ class TestWorker(RQTestCase):
self.assertEqual(job.result, os.getpid(), self.assertEqual(job.result, os.getpid(),
'PID mismatch, fork() is not supposed to happen here') 'PID mismatch, fork() is not supposed to happen here')
def test_simpleworker_heartbeat_ttl(self):
"""SimpleWorker's key must last longer than job.timeout when working"""
queue = Queue('foo')
worker = SimpleWorker([queue])
job_timeout = 300
job = queue.enqueue(save_key_ttl, worker.key, timeout=job_timeout)
worker.work(burst=True)
job.refresh()
self.assertGreater(job.meta['ttl'], job_timeout)
def test_prepare_job_execution(self): def test_prepare_job_execution(self):
"""Prepare job execution does the necessary bookkeeping.""" """Prepare job execution does the necessary bookkeeping."""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)

Loading…
Cancel
Save