diff --git a/Dockerfile b/Dockerfile index 0df5eb6..7d996b5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,4 +2,5 @@ FROM shrinand/k8s-minion-manager:v0.1 COPY . / ENV PYTHONPATH=/ +RUN mv binaries/kubectl-v1.12.3-linux-amd64 /usr/local/bin/kubectl RUN chmod u+x minion_manager.py diff --git a/binaries/kubectl-v1.12.3-linux-amd64 b/binaries/kubectl-v1.12.3-linux-amd64 new file mode 100755 index 0000000..9b2bcee Binary files /dev/null and b/binaries/kubectl-v1.12.3-linux-amd64 differ diff --git a/cloud_provider/aws/aws_minion_manager.py b/cloud_provider/aws/aws_minion_manager.py index 0da5a91..57f15b8 100644 --- a/cloud_provider/aws/aws_minion_manager.py +++ b/cloud_provider/aws/aws_minion_manager.py @@ -11,9 +11,12 @@ from retrying import retry from bunch import bunchify import pytz +import shlex +import subprocess from constants import SECONDS_PER_MINUTE, SECONDS_PER_HOUR from cloud_provider.aws.aws_bid_advisor import AWSBidAdvisor from cloud_provider.aws.price_info_reporter import AWSPriceReporter +from kubernetes import client, config from ..base import MinionManagerBase from .asg_mm import AWSAutoscalinGroupMM @@ -341,6 +344,34 @@ def wait_for_all_running(self, asg_meta): all_done = False time.sleep(60) + def get_name_for_instance(self, instance): + config.load_incluster_config() + v1 = client.CoreV1Api() + for item in v1.list_node().items: + if instance.InstanceId in item.spec.provider_id: + logger.info("Instance name for %s in Kubernetes clusters is %s", + instance.InstanceId, item.metadata.name) + return item.metadata.name + return None + + def cordon_node(self, instance): + """" Runs 'kubectl drain' to actually drain the node.""" + instance_name = self.get_name_for_instance(instance) + if instance_name: + try: + cmd = "kubectl drain " + instance_name + " --ignore-daemonsets=true --delete-local-data=true" + subprocess.check_call(shlex.split(cmd)) + logger.info("Drained instance %s", instance_name) + except Exception as ex: + logger.info("Failed to drain node: " + str(ex) + ". Will try to uncordon") + cmd = "kubectl uncordon " + instance_name + subprocess.check_call(shlex.split(cmd)) + logger.info("Uncordoned node " + instance_name) + else: + logger.info("Instance %s not found in Kubernetes cluster. Will not drain the instance.", + instance.InstanceId) + return True + @retry(wait_exponential_multiplier=1000, stop_max_attempt_number=3) def run_or_die(self, instance, asg_meta, asg_semaphore): """ Terminates the given instance. """ @@ -369,6 +400,9 @@ def run_or_die(self, instance, asg_meta, asg_semaphore): asg_meta.get_instance_name(instance), instance.InstanceId, asg_meta.get_name()) return False + # Cordon and drain the node first + self.cordon_node(instance) + self._ec2_client.terminate_instances(InstanceIds=[instance.InstanceId]) logger.info("Terminated instance %s", instance.InstanceId) asg_meta.remove_instance(instance.InstanceId) diff --git a/cloud_provider/aws/aws_minion_manager_test.py b/cloud_provider/aws/aws_minion_manager_test.py index 401ae56..f84a3ab 100644 --- a/cloud_provider/aws/aws_minion_manager_test.py +++ b/cloud_provider/aws/aws_minion_manager_test.py @@ -3,6 +3,8 @@ import unittest import mock import pytest +import subprocess +import shlex from cloud_provider.aws.aws_minion_manager import AWSMinionManager from cloud_provider.aws.aws_bid_advisor import AWSBidAdvisor from moto import mock_autoscaling, mock_sts, mock_ec2 @@ -336,8 +338,24 @@ def _semaphore_helper(minion_manager_tag, percentage, outcome): awsmm.terminate_percentage = percentage get_semaphore = awsmm.set_semaphore(asg_meta) assert get_semaphore._Semaphore__value == outcome - + _semaphore_helper('use-spot', 1, 1) _semaphore_helper('use-spot', 30, 1) _semaphore_helper('use-spot', 60, 2) _semaphore_helper('use-spot', 100, 3) + + @mock.patch('subprocess.check_call') + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.get_name_for_instance') + @mock_autoscaling + @mock_ec2 + @mock_sts + def test_cordon(self, mock_get_name_for_instance, mock_check_call): + awsmm = self.basic_setup_and_test() + mock_get_name_for_instance.return_value = "ip-of-fake-node-name" + awsmm.cordon_node("ip-of-fake-node") + mock_check_call.assert_called_with(['kubectl', 'drain', 'ip-of-fake-node-name', + '--ignore-daemonsets=true', '--delete-local-data=true']) + + mock_check_call.side_effect = [Exception("Test"), True] + awsmm.cordon_node("ip-of-fake-node") + mock_check_call.assert_called_with(['kubectl', 'uncordon', 'ip-of-fake-node-name']) \ No newline at end of file