Workers can listen to external commands via pubsub (#1363)

* Added a way to send shutdown command via pubsub

* Added kill-horse command

* Added kill horse command

* Added send_kill_horse_command() and send_shutdown_command()

* Document worker commands
main
Selwin Ong 4 years ago committed by GitHub
parent 0e65bab10b
commit a721db34b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -371,3 +371,41 @@ If you want to disable RQ's default exception handler, use the `--disable-defaul
```console ```console
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --disable-default-exception-handler $ rq worker --exception-handler 'path.to.my.ErrorHandler' --disable-default-exception-handler
``` ```
## Sending Commands to Worker
_New in version 1.6.0._
Starting in version 1.6.0, workers use Redis' pubsub mechanism to listen to external commands while
they're working. Two commands are currently implemented:
* `send_shutdown_command()`: sends shutdown command to worker. This is similar to sending a SIGINT
signal to a worker.
```python
from redis import Redis
from rq.command import send_shutdown_command
from rq.worker import Worker
redis = Redis()
workers = Worker.all(redis)
for worker in workers:
send_shutdown_command(redis, worker.name) # Tells worker to shutdown
```
* `send_kill_horse_command()`: tells a worker to cancel a currently executing job. If worker is
not currently working, this command will be ignored.
```python
from redis import Redis
from rq.command import send_kill_horse_command
from rq.worker import Worker, WorkerStatus
redis = Redis()
workers = Worker.all(redis)
for worker in workers:
if worker.state = WorkerStatus.BUSY:
send_kill_horse_command(redis, worker.name)
```

@ -0,0 +1,25 @@
import json
PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'
def send_command(redis, worker_name, command):
"""Use Redis' pubsub mechanism to send a command"""
payload = {'command': command}
redis.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
def parse_payload(payload):
"""Returns a dict of command data"""
return json.loads(payload.get('data').decode())
def send_shutdown_command(redis, worker_name):
"""Send shutdown command"""
send_command(redis, worker_name, 'shutdown')
def send_kill_horse_command(redis, worker_name):
"""Tell worker to kill it's horse"""
send_command(redis, worker_name, 'kill-horse')

@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import errno import errno
import json
import logging import logging
import os import os
import random import random
@ -25,6 +26,7 @@ except ImportError:
from redis import WatchError from redis import WatchError
from . import worker_registration from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE
from .compat import PY2, as_text, string_types, text_type from .compat import PY2, as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
@ -211,6 +213,8 @@ class Worker(object):
self.total_working_time = 0 self.total_working_time = 0
self.birth_date = None self.birth_date = None
self.scheduler = None self.scheduler = None
self.pubsub = None
self.pubsub_thread = None
self.disable_default_exception_handler = disable_default_exception_handler self.disable_default_exception_handler = disable_default_exception_handler
@ -245,6 +249,11 @@ class Worker(object):
"""Returns the worker's Redis hash key.""" """Returns the worker's Redis hash key."""
return self.redis_worker_namespace_prefix + self.name return self.redis_worker_namespace_prefix + self.name
@property
def pubsub_channel_name(self):
"""Returns the worker's Redis hash key."""
return PUBSUB_CHANNEL_TEMPLATE % self.name
@property @property
def horse_pid(self): def horse_pid(self):
"""The horse's process ID. Only available in the worker. Will return """The horse's process ID. Only available in the worker. Will return
@ -385,7 +394,6 @@ class Worker(object):
"""Installs signal handlers for handling SIGINT and SIGTERM """Installs signal handlers for handling SIGINT and SIGTERM
gracefully. gracefully.
""" """
signal.signal(signal.SIGINT, self.request_stop) signal.signal(signal.SIGINT, self.request_stop)
signal.signal(signal.SIGTERM, self.request_stop) signal.signal(signal.SIGTERM, self.request_stop)
@ -438,9 +446,13 @@ class Worker(object):
signal.signal(signal.SIGTERM, self.request_force_stop) signal.signal(signal.SIGTERM, self.request_force_stop)
self.handle_warm_shutdown_request() self.handle_warm_shutdown_request()
self._shutdown()
# If shutdown is requested in the middle of a job, wait until
# finish before shutting down and save the request in redis def _shutdown(self):
"""
If shutdown is requested in the middle of a job, wait until
finish before shutting down and save the request in redis
"""
if self.get_state() == WorkerStatus.BUSY: if self.get_state() == WorkerStatus.BUSY:
self._stop_requested = True self._stop_requested = True
self.set_shutdown_requested_date() self.set_shutdown_requested_date()
@ -492,6 +504,22 @@ class Worker(object):
if self.scheduler and not self.scheduler._process: if self.scheduler and not self.scheduler._process:
self.scheduler.acquire_locks(auto_start=True) self.scheduler.acquire_locks(auto_start=True)
self.clean_registries() self.clean_registries()
def subscribe(self):
"""Subscribe to this worker's channel"""
self.log.info('Subscribing to channel %s', self.pubsub_channel_name)
self.pubsub = self.connection.pubsub()
self.pubsub.subscribe(**{self.pubsub_channel_name: self.handle_payload})
self.pubsub_thread = self.pubsub.run_in_thread(sleep_time=0.2)
def unsubscribe(self):
"""Unsubscribe from pubsub channel"""
if self.pubsub_thread:
self.log.info('Unsubscribing from channel %s', self.pubsub_channel_name)
self.pubsub_thread.stop()
self.pubsub_thread.join()
self.pubsub.unsubscribe()
self.pubsub.close()
def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False): log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False):
@ -507,6 +535,7 @@ class Worker(object):
completed_jobs = 0 completed_jobs = 0
self.register_birth() self.register_birth()
self.log.info("Worker %s: started, version %s", self.key, VERSION) self.log.info("Worker %s: started, version %s", self.key, VERSION)
self.subscribe()
self.set_state(WorkerStatus.STARTED) self.set_state(WorkerStatus.STARTED)
qnames = self.queue_names() qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames))) self.log.info('*** Listening on %s...', green(', '.join(qnames)))
@ -538,7 +567,6 @@ class Worker(object):
break break
timeout = None if burst else max(1, self.default_worker_ttl - 15) timeout = None if burst else max(1, self.default_worker_ttl - 15)
result = self.dequeue_job_and_maintain_ttl(timeout) result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None: if result is None:
if burst: if burst:
@ -578,6 +606,7 @@ class Worker(object):
self.stop_scheduler() self.stop_scheduler()
self.register_death() self.register_death()
self.unsubscribe()
return bool(completed_jobs) return bool(completed_jobs)
def stop_scheduler(self): def stop_scheduler(self):
@ -742,6 +771,7 @@ class Worker(object):
# Send a heartbeat to keep the worker alive. # Send a heartbeat to keep the worker alive.
self.heartbeat() self.heartbeat()
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally. if ret_val == os.EX_OK: # The process exited normally.
return return
job_status = job.get_status() job_status = job.get_status()
@ -1047,6 +1077,20 @@ class Worker(object):
return True return True
return False return False
def handle_payload(self, message):
payload = parse_payload(message)
if payload['command'] == 'shutdown':
self.log.info('Received shutdown command, sending SIGINT signal.')
pid = os.getpid()
os.kill(pid, signal.SIGINT)
elif payload['command'] == 'kill-horse':
self.log.info('Received kill horse command.')
if self.horse_pid:
self.log.info('Kiling horse...')
self.kill_horse()
else:
self.log.info('Worker is not working, ignoring kill horse command')
class SimpleWorker(Worker): class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs): def main_work_horse(self, *args, **kwargs):

@ -0,0 +1,45 @@
import time
from multiprocessing import Process
from tests import RQTestCase
from tests.fixtures import long_running_job
from rq import Queue, Worker
from rq.command import send_command, send_kill_horse_command, send_shutdown_command
class TestCommands(RQTestCase):
def test_shutdown_command(self):
"""Ensure that shutdown command works properly."""
connection = self.testconn
worker = Worker('foo', connection=connection)
def _send_shutdown_command():
time.sleep(0.25)
send_shutdown_command(connection, worker.name)
p = Process(target=_send_shutdown_command)
p.start()
worker.work()
p.join(1)
def test_kill_horse_command(self):
"""Ensure that shutdown command works properly."""
connection = self.testconn
queue = Queue('foo', connection=connection)
job = queue.enqueue(long_running_job, 4)
worker = Worker('foo', connection=connection)
def _send_kill_horse_command():
"""Waits 0.25 seconds before sending kill-horse command"""
time.sleep(0.25)
send_kill_horse_command(connection, worker.name)
p = Process(target=_send_kill_horse_command)
p.start()
worker.work(burst=True)
p.join(1)
job.refresh()
self.assertTrue(job.id in queue.failed_job_registry)
Loading…
Cancel
Save