send_stop_job_command (#1376)

* Added send_stop_job_command().

* send_stop_job_command now accepts just connection and job_id

* Document send_job_job_command

* Updated test coverage
main
Selwin Ong 4 years ago committed by GitHub
parent f3e924cdd1
commit 492e77d86d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -409,3 +409,16 @@ for worker in workers:
if worker.state = WorkerStatus.BUSY: if worker.state = WorkerStatus.BUSY:
send_kill_horse_command(redis, worker.name) send_kill_horse_command(redis, worker.name)
``` ```
_New in version 1.7.0._
* `send_stop_job_command()`: tells worker to stop a job.
```python
from redis import Redis
from rq.command import send_stop_job_command
redis = Redis()
# This will raise an exception if job is invalid or not currently executing
send_stop_job_command(redis, job_id)
```

@ -1,13 +1,20 @@
import json import json
import os
import signal
from rq.exceptions import InvalidJobOperation
from rq.job import Job
PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s' PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'
def send_command(redis, worker_name, command): def send_command(connection, worker_name, command, **kwargs):
"""Use Redis' pubsub mechanism to send a command""" """Use connection' pubsub mechanism to send a command"""
payload = {'command': command} payload = {'command': command}
redis.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload)) if kwargs:
payload.update(kwargs)
connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
def parse_payload(payload): def parse_payload(payload):
@ -15,11 +22,56 @@ def parse_payload(payload):
return json.loads(payload.get('data').decode()) return json.loads(payload.get('data').decode())
def send_shutdown_command(redis, worker_name): def send_shutdown_command(connection, worker_name):
"""Send shutdown command""" """Send shutdown command"""
send_command(redis, worker_name, 'shutdown') send_command(connection, worker_name, 'shutdown')
def send_kill_horse_command(redis, worker_name): def send_kill_horse_command(connection, worker_name):
"""Tell worker to kill it's horse""" """Tell worker to kill it's horse"""
send_command(redis, worker_name, 'kill-horse') send_command(connection, worker_name, 'kill-horse')
def send_stop_job_command(connection, job_id):
"""Instruct a worker to stop a job"""
job = Job.fetch(job_id, connection=connection)
if not job.worker_name:
raise InvalidJobOperation('Job is not currently executing')
send_command(connection, job.worker_name, 'stop-job', job_id=job_id)
def handle_command(worker, payload):
"""Parses payload and routes commands"""
if payload['command'] == 'stop-job':
handle_stop_job_command(worker, payload)
elif payload['command'] == 'shutdown':
handle_shutdown_command(worker)
elif payload['command'] == 'kill-horse':
handle_kill_worker_command(worker, payload)
def handle_shutdown_command(worker):
"""Perform shutdown command"""
worker.log.info('Received shutdown command, sending SIGINT signal.')
pid = os.getpid()
os.kill(pid, signal.SIGINT)
def handle_kill_worker_command(worker, payload):
"""Stops work horse"""
worker.log.info('Received kill horse command.')
if worker.horse_pid:
worker.log.info('Kiling horse...')
worker.kill_horse()
else:
worker.log.info('Worker is not working, kill horse command ignored')
def handle_stop_job_command(worker, payload):
"""Handles stop job command"""
job_id = payload.get('job_id')
worker.log.debug('Received command to stop job %s', job_id)
if job_id and worker.get_current_job_id() == job_id:
worker.kill_horse()
else:
worker.log.info('Not working on job %s, command ignored.', job_id)

@ -45,7 +45,8 @@ def truncate_long_string(data, maxlen=75):
""" Truncates strings longer than maxlen """ Truncates strings longer than maxlen
""" """
return (data[:maxlen] + '...') if len(data) > maxlen else data return (data[:maxlen] + '...') if len(data) > maxlen else data
def cancel_job(job_id, connection=None): def cancel_job(job_id, connection=None):
"""Cancels the job with the given job ID, preventing execution. Discards """Cancels the job with the given job ID, preventing execution. Discards
any job info (i.e. it can't be requeued later). any job info (i.e. it can't be requeued later).

@ -25,7 +25,7 @@ except ImportError:
from redis import WatchError from redis import WatchError
from . import worker_registration from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
from .compat import as_text, string_types, text_type from .compat import as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
@ -1071,18 +1071,10 @@ class Worker(object):
return False return False
def handle_payload(self, message): def handle_payload(self, message):
"""Handle external commands"""
self.log.debug('Received message: %s', message)
payload = parse_payload(message) payload = parse_payload(message)
if payload['command'] == 'shutdown': handle_command(self, payload)
self.log.info('Received shutdown command, sending SIGINT signal.')
pid = os.getpid()
os.kill(pid, signal.SIGINT)
elif payload['command'] == 'kill-horse':
self.log.info('Received kill horse command.')
if self.horse_pid:
self.log.info('Kiling horse...')
self.kill_horse()
else:
self.log.info('Worker is not working, ignoring kill horse command')
class SimpleWorker(Worker): class SimpleWorker(Worker):

@ -6,7 +6,9 @@ from tests import RQTestCase
from tests.fixtures import long_running_job from tests.fixtures import long_running_job
from rq import Queue, Worker from rq import Queue, Worker
from rq.command import send_command, send_kill_horse_command, send_shutdown_command from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command
from rq.exceptions import InvalidJobOperation, NoSuchJobError
from rq.worker import WorkerStatus
class TestCommands(RQTestCase): class TestCommands(RQTestCase):
@ -42,4 +44,54 @@ class TestCommands(RQTestCase):
worker.work(burst=True) worker.work(burst=True)
p.join(1) p.join(1)
job.refresh() job.refresh()
self.assertTrue(job.id in queue.failed_job_registry) self.assertTrue(job.id in queue.failed_job_registry)
def start_work():
worker.work()
p = Process(target=start_work)
p.start()
p.join(2)
send_kill_horse_command(connection, worker.name)
worker.refresh()
# Since worker is not busy, command will be ignored
self.assertEqual(worker.get_state(), WorkerStatus.IDLE)
send_shutdown_command(connection, worker.name)
def test_stop_job_command(self):
"""Ensure that stop_job command works properly."""
connection = self.testconn
queue = Queue('foo', connection=connection)
job = queue.enqueue(long_running_job, 3)
worker = Worker('foo', connection=connection)
# If job is not executing, an error is raised
with self.assertRaises(InvalidJobOperation):
send_stop_job_command(connection, job_id=job.id)
# An exception is raised if job ID is invalid
with self.assertRaises(NoSuchJobError):
send_stop_job_command(connection, job_id='1')
def start_work():
worker.work(burst=True)
p = Process(target=start_work)
p.start()
p.join(1)
time.sleep(0.1)
send_command(connection, worker.name, 'stop-job', job_id=1)
time.sleep(0.25)
# Worker still working due to job_id mismatch
worker.refresh()
self.assertEqual(worker.get_state(), WorkerStatus.BUSY)
send_stop_job_command(connection, job_id=job.id)
time.sleep(0.25)
# Worker has stopped working
worker.refresh()
self.assertEqual(worker.get_state(), WorkerStatus.IDLE)

Loading…
Cancel
Save