diff --git a/docs/docs/workers.md b/docs/docs/workers.md index 25d0095..950565d 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -371,3 +371,41 @@ If you want to disable RQ's default exception handler, use the `--disable-defaul ```console $ rq worker --exception-handler 'path.to.my.ErrorHandler' --disable-default-exception-handler ``` + + +## Sending Commands to Worker +_New in version 1.6.0._ + +Starting in version 1.6.0, workers use Redis' pubsub mechanism to listen to external commands while +they're working. Two commands are currently implemented: + +* `send_shutdown_command()`: sends shutdown command to worker. This is similar to sending a SIGINT +signal to a worker. + +```python +from redis import Redis +from rq.command import send_shutdown_command +from rq.worker import Worker + +redis = Redis() + +workers = Worker.all(redis) +for worker in workers: + send_shutdown_command(redis, worker.name) # Tells worker to shutdown +``` + +* `send_kill_horse_command()`: tells a worker to cancel a currently executing job. If worker is +not currently working, this command will be ignored. + +```python +from redis import Redis +from rq.command import send_kill_horse_command +from rq.worker import Worker, WorkerStatus + +redis = Redis() + +workers = Worker.all(redis) +for worker in workers: + if worker.state = WorkerStatus.BUSY: + send_kill_horse_command(redis, worker.name) +``` diff --git a/rq/command.py b/rq/command.py new file mode 100644 index 0000000..be8fea8 --- /dev/null +++ b/rq/command.py @@ -0,0 +1,25 @@ +import json + + +PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s' + + +def send_command(redis, worker_name, command): + """Use Redis' pubsub mechanism to send a command""" + payload = {'command': command} + redis.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload)) + + +def parse_payload(payload): + """Returns a dict of command data""" + return json.loads(payload.get('data').decode()) + + +def send_shutdown_command(redis, worker_name): + """Send shutdown command""" + send_command(redis, worker_name, 'shutdown') + + +def send_kill_horse_command(redis, worker_name): + """Tell worker to kill it's horse""" + send_command(redis, worker_name, 'kill-horse') diff --git a/rq/worker.py b/rq/worker.py index 88c72b0..13ed298 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import errno +import json import logging import os import random @@ -25,6 +26,7 @@ except ImportError: from redis import WatchError from . import worker_registration +from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE from .compat import PY2, as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection @@ -211,6 +213,8 @@ class Worker(object): self.total_working_time = 0 self.birth_date = None self.scheduler = None + self.pubsub = None + self.pubsub_thread = None self.disable_default_exception_handler = disable_default_exception_handler @@ -245,6 +249,11 @@ class Worker(object): """Returns the worker's Redis hash key.""" return self.redis_worker_namespace_prefix + self.name + @property + def pubsub_channel_name(self): + """Returns the worker's Redis hash key.""" + return PUBSUB_CHANNEL_TEMPLATE % self.name + @property def horse_pid(self): """The horse's process ID. Only available in the worker. Will return @@ -385,7 +394,6 @@ class Worker(object): """Installs signal handlers for handling SIGINT and SIGTERM gracefully. """ - signal.signal(signal.SIGINT, self.request_stop) signal.signal(signal.SIGTERM, self.request_stop) @@ -438,9 +446,13 @@ class Worker(object): signal.signal(signal.SIGTERM, self.request_force_stop) self.handle_warm_shutdown_request() - - # If shutdown is requested in the middle of a job, wait until - # finish before shutting down and save the request in redis + self._shutdown() + + def _shutdown(self): + """ + If shutdown is requested in the middle of a job, wait until + finish before shutting down and save the request in redis + """ if self.get_state() == WorkerStatus.BUSY: self._stop_requested = True self.set_shutdown_requested_date() @@ -492,6 +504,22 @@ class Worker(object): if self.scheduler and not self.scheduler._process: self.scheduler.acquire_locks(auto_start=True) self.clean_registries() + + def subscribe(self): + """Subscribe to this worker's channel""" + self.log.info('Subscribing to channel %s', self.pubsub_channel_name) + self.pubsub = self.connection.pubsub() + self.pubsub.subscribe(**{self.pubsub_channel_name: self.handle_payload}) + self.pubsub_thread = self.pubsub.run_in_thread(sleep_time=0.2) + + def unsubscribe(self): + """Unsubscribe from pubsub channel""" + if self.pubsub_thread: + self.log.info('Unsubscribing from channel %s', self.pubsub_channel_name) + self.pubsub_thread.stop() + self.pubsub_thread.join() + self.pubsub.unsubscribe() + self.pubsub.close() def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False): @@ -507,6 +535,7 @@ class Worker(object): completed_jobs = 0 self.register_birth() self.log.info("Worker %s: started, version %s", self.key, VERSION) + self.subscribe() self.set_state(WorkerStatus.STARTED) qnames = self.queue_names() self.log.info('*** Listening on %s...', green(', '.join(qnames))) @@ -538,7 +567,6 @@ class Worker(object): break timeout = None if burst else max(1, self.default_worker_ttl - 15) - result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: @@ -578,6 +606,7 @@ class Worker(object): self.stop_scheduler() self.register_death() + self.unsubscribe() return bool(completed_jobs) def stop_scheduler(self): @@ -742,6 +771,7 @@ class Worker(object): # Send a heartbeat to keep the worker alive. self.heartbeat() + self._horse_pid = 0 # Set horse PID to 0, horse has finished working if ret_val == os.EX_OK: # The process exited normally. return job_status = job.get_status() @@ -1047,6 +1077,20 @@ class Worker(object): return True return False + def handle_payload(self, message): + payload = parse_payload(message) + if payload['command'] == 'shutdown': + self.log.info('Received shutdown command, sending SIGINT signal.') + pid = os.getpid() + os.kill(pid, signal.SIGINT) + elif payload['command'] == 'kill-horse': + self.log.info('Received kill horse command.') + if self.horse_pid: + self.log.info('Kiling horse...') + self.kill_horse() + else: + self.log.info('Worker is not working, ignoring kill horse command') + class SimpleWorker(Worker): def main_work_horse(self, *args, **kwargs): diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..647a17a --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,45 @@ +import time + +from multiprocessing import Process + +from tests import RQTestCase +from tests.fixtures import long_running_job + +from rq import Queue, Worker +from rq.command import send_command, send_kill_horse_command, send_shutdown_command + + +class TestCommands(RQTestCase): + + def test_shutdown_command(self): + """Ensure that shutdown command works properly.""" + connection = self.testconn + worker = Worker('foo', connection=connection) + + def _send_shutdown_command(): + time.sleep(0.25) + send_shutdown_command(connection, worker.name) + + p = Process(target=_send_shutdown_command) + p.start() + worker.work() + p.join(1) + + def test_kill_horse_command(self): + """Ensure that shutdown command works properly.""" + connection = self.testconn + queue = Queue('foo', connection=connection) + job = queue.enqueue(long_running_job, 4) + worker = Worker('foo', connection=connection) + + def _send_kill_horse_command(): + """Waits 0.25 seconds before sending kill-horse command""" + time.sleep(0.25) + send_kill_horse_command(connection, worker.name) + + p = Process(target=_send_kill_horse_command) + p.start() + worker.work(burst=True) + p.join(1) + job.refresh() + self.assertTrue(job.id in queue.failed_job_registry) \ No newline at end of file