Skip to content
This repository has been archived by the owner on Jul 9, 2024. It is now read-only.

Commit

Permalink
Cordon and drain nodes before terminating. (#21)
Browse files Browse the repository at this point in the history
Testing Done:

- Added unit tests.
- Verified on a real-cluster that the nodes were cordoned and drained.

Closes #19.
  • Loading branch information
shrinandj committed Feb 15, 2019
1 parent ce4090e commit ad69c2a
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 1 deletion.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ FROM shrinand/k8s-minion-manager:v0.1

COPY . /
ENV PYTHONPATH=/
RUN mv binaries/kubectl-v1.12.3-linux-amd64 /usr/local/bin/kubectl
RUN chmod u+x minion_manager.py
Binary file added binaries/kubectl-v1.12.3-linux-amd64
Binary file not shown.
34 changes: 34 additions & 0 deletions cloud_provider/aws/aws_minion_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
from retrying import retry
from bunch import bunchify
import pytz
import shlex
import subprocess
from constants import SECONDS_PER_MINUTE, SECONDS_PER_HOUR
from cloud_provider.aws.aws_bid_advisor import AWSBidAdvisor
from cloud_provider.aws.price_info_reporter import AWSPriceReporter
from kubernetes import client, config
from ..base import MinionManagerBase
from .asg_mm import AWSAutoscalinGroupMM

Expand Down Expand Up @@ -341,6 +344,34 @@ def wait_for_all_running(self, asg_meta):
all_done = False
time.sleep(60)

def get_name_for_instance(self, instance):
config.load_incluster_config()
v1 = client.CoreV1Api()
for item in v1.list_node().items:
if instance.InstanceId in item.spec.provider_id:
logger.info("Instance name for %s in Kubernetes clusters is %s",
instance.InstanceId, item.metadata.name)
return item.metadata.name
return None

def cordon_node(self, instance):
"""" Runs 'kubectl drain' to actually drain the node."""
instance_name = self.get_name_for_instance(instance)
if instance_name:
try:
cmd = "kubectl drain " + instance_name + " --ignore-daemonsets=true --delete-local-data=true"
subprocess.check_call(shlex.split(cmd))
logger.info("Drained instance %s", instance_name)
except Exception as ex:
logger.info("Failed to drain node: " + str(ex) + ". Will try to uncordon")
cmd = "kubectl uncordon " + instance_name
subprocess.check_call(shlex.split(cmd))
logger.info("Uncordoned node " + instance_name)
else:
logger.info("Instance %s not found in Kubernetes cluster. Will not drain the instance.",
instance.InstanceId)
return True

@retry(wait_exponential_multiplier=1000, stop_max_attempt_number=3)
def run_or_die(self, instance, asg_meta, asg_semaphore):
""" Terminates the given instance. """
Expand Down Expand Up @@ -369,6 +400,9 @@ def run_or_die(self, instance, asg_meta, asg_semaphore):
asg_meta.get_instance_name(instance), instance.InstanceId, asg_meta.get_name())
return False

# Cordon and drain the node first
self.cordon_node(instance)

self._ec2_client.terminate_instances(InstanceIds=[instance.InstanceId])
logger.info("Terminated instance %s", instance.InstanceId)
asg_meta.remove_instance(instance.InstanceId)
Expand Down
20 changes: 19 additions & 1 deletion cloud_provider/aws/aws_minion_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import unittest
import mock
import pytest
import subprocess
import shlex
from cloud_provider.aws.aws_minion_manager import AWSMinionManager
from cloud_provider.aws.aws_bid_advisor import AWSBidAdvisor
from moto import mock_autoscaling, mock_sts, mock_ec2
Expand Down Expand Up @@ -336,8 +338,24 @@ def _semaphore_helper(minion_manager_tag, percentage, outcome):
awsmm.terminate_percentage = percentage
get_semaphore = awsmm.set_semaphore(asg_meta)
assert get_semaphore._Semaphore__value == outcome

_semaphore_helper('use-spot', 1, 1)
_semaphore_helper('use-spot', 30, 1)
_semaphore_helper('use-spot', 60, 2)
_semaphore_helper('use-spot', 100, 3)

@mock.patch('subprocess.check_call')
@mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.get_name_for_instance')
@mock_autoscaling
@mock_ec2
@mock_sts
def test_cordon(self, mock_get_name_for_instance, mock_check_call):
awsmm = self.basic_setup_and_test()
mock_get_name_for_instance.return_value = "ip-of-fake-node-name"
awsmm.cordon_node("ip-of-fake-node")
mock_check_call.assert_called_with(['kubectl', 'drain', 'ip-of-fake-node-name',
'--ignore-daemonsets=true', '--delete-local-data=true'])

mock_check_call.side_effect = [Exception("Test"), True]
awsmm.cordon_node("ip-of-fake-node")
mock_check_call.assert_called_with(['kubectl', 'uncordon', 'ip-of-fake-node-name'])

0 comments on commit ad69c2a

Please sign in to comment.