Skip to content
This repository has been archived by the owner on Jul 9, 2024. It is now read-only.

Cordon and drain nodes before terminating. #21

Merged
merged 1 commit into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ FROM shrinand/k8s-minion-manager:v0.1

COPY . /
ENV PYTHONPATH=/
RUN mv binaries/kubectl-v1.12.3-linux-amd64 /usr/local/bin/kubectl
RUN chmod u+x minion_manager.py
Binary file added binaries/kubectl-v1.12.3-linux-amd64
Binary file not shown.
34 changes: 34 additions & 0 deletions cloud_provider/aws/aws_minion_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
from retrying import retry
from bunch import bunchify
import pytz
import shlex
import subprocess
from constants import SECONDS_PER_MINUTE, SECONDS_PER_HOUR
from cloud_provider.aws.aws_bid_advisor import AWSBidAdvisor
from cloud_provider.aws.price_info_reporter import AWSPriceReporter
from kubernetes import client, config
from ..base import MinionManagerBase
from .asg_mm import AWSAutoscalinGroupMM

Expand Down Expand Up @@ -341,6 +344,34 @@ def wait_for_all_running(self, asg_meta):
all_done = False
time.sleep(60)

def get_name_for_instance(self, instance):
config.load_incluster_config()
v1 = client.CoreV1Api()
for item in v1.list_node().items:
if instance.InstanceId in item.spec.provider_id:
logger.info("Instance name for %s in Kubernetes clusters is %s",
instance.InstanceId, item.metadata.name)
return item.metadata.name
return None

def cordon_node(self, instance):
"""" Runs 'kubectl drain' to actually drain the node."""
instance_name = self.get_name_for_instance(instance)
if instance_name:
try:
cmd = "kubectl drain " + instance_name + " --ignore-daemonsets=true --delete-local-data=true"
subprocess.check_call(shlex.split(cmd))
logger.info("Drained instance %s", instance_name)
except Exception as ex:
logger.info("Failed to drain node: " + str(ex) + ". Will try to uncordon")
cmd = "kubectl uncordon " + instance_name
subprocess.check_call(shlex.split(cmd))
logger.info("Uncordoned node " + instance_name)
else:
logger.info("Instance %s not found in Kubernetes cluster. Will not drain the instance.",
instance.InstanceId)
return True

@retry(wait_exponential_multiplier=1000, stop_max_attempt_number=3)
def run_or_die(self, instance, asg_meta, asg_semaphore):
""" Terminates the given instance. """
Expand Down Expand Up @@ -369,6 +400,9 @@ def run_or_die(self, instance, asg_meta, asg_semaphore):
asg_meta.get_instance_name(instance), instance.InstanceId, asg_meta.get_name())
return False

# Cordon and drain the node first
self.cordon_node(instance)

self._ec2_client.terminate_instances(InstanceIds=[instance.InstanceId])
logger.info("Terminated instance %s", instance.InstanceId)
asg_meta.remove_instance(instance.InstanceId)
Expand Down
20 changes: 19 additions & 1 deletion cloud_provider/aws/aws_minion_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import unittest
import mock
import pytest
import subprocess
import shlex
from cloud_provider.aws.aws_minion_manager import AWSMinionManager
from cloud_provider.aws.aws_bid_advisor import AWSBidAdvisor
from moto import mock_autoscaling, mock_sts, mock_ec2
Expand Down Expand Up @@ -336,8 +338,24 @@ def _semaphore_helper(minion_manager_tag, percentage, outcome):
awsmm.terminate_percentage = percentage
get_semaphore = awsmm.set_semaphore(asg_meta)
assert get_semaphore._Semaphore__value == outcome

_semaphore_helper('use-spot', 1, 1)
_semaphore_helper('use-spot', 30, 1)
_semaphore_helper('use-spot', 60, 2)
_semaphore_helper('use-spot', 100, 3)

@mock.patch('subprocess.check_call')
@mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.get_name_for_instance')
@mock_autoscaling
@mock_ec2
@mock_sts
def test_cordon(self, mock_get_name_for_instance, mock_check_call):
awsmm = self.basic_setup_and_test()
mock_get_name_for_instance.return_value = "ip-of-fake-node-name"
awsmm.cordon_node("ip-of-fake-node")
mock_check_call.assert_called_with(['kubectl', 'drain', 'ip-of-fake-node-name',
'--ignore-daemonsets=true', '--delete-local-data=true'])

mock_check_call.side_effect = [Exception("Test"), True]
awsmm.cordon_node("ip-of-fake-node")
mock_check_call.assert_called_with(['kubectl', 'uncordon', 'ip-of-fake-node-name'])