Skip to content

Commit

Permalink
feat: Implement optional eager cordoning (#42)
Browse files Browse the repository at this point in the history
* feat: optionally cordon all outdated nodes at the beginning instead of shortly before draining to prevent pods from being scheduled on outdated nodes during rolling updates

* fix: change env var name of eager cordoning process

Co-authored-by: Fabian Maier <fabian.maier@deutschebahn.com>
  • Loading branch information
LaCodon and Fabian Maier authored Dec 1, 2022
1 parent d01c8c5 commit bb18d4b
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 26 deletions.
31 changes: 16 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,22 @@ Therefore, this application will not run into any issues if it is restarted, res

## Usage

| Environment variable | Description | Required | Default |
|:-----------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------|:------------|
| CLUSTER_NAME | Name of the eks-cluster, used in place of `AUTODISCOVERRY_TAGS` and `AUTO_SCALING_GROUP_NAMES`. Checks for `k8s.io/cluster-autoscaler/<CLUSTER_NAME>: owned` and `k8s.io/cluster-autoscaler/enabled: true` tags on ASG | yes | `""` |
| AUTODISCOVERY_TAGS | Comma separated key value string with tags to autodiscover ASGs, used in place of `CLUSTER_NAME` and `AUTO_SCALING_GROUP_NAMES`. | yes | `""` |
| AUTO_SCALING_GROUP_NAMES | Comma-separated list of ASGs, CLUSTER_NAME takes priority. | yes | `""` |
| IGNORE_DAEMON_SETS | Whether to ignore DaemonSets when draining the nodes | no | `true` |
| DELETE_EMPTY_DIR_DATA | Whether to delete empty dir data when draining the nodes | no | `true` |
| AWS_REGION | Self-explanatory | no | `us-west-2` |
| ENVIRONMENT | If set to `dev`, will try to create the Kubernetes client using your local kubeconfig. Any other values will use the in-cluster configuration | no | `""` |
| EXECUTION_INTERVAL | Duration to sleep between each execution in seconds | no | `20` |
| EXECUTION_TIMEOUT | Maximum execution duration before timing out in seconds | no | `900` |
| POD_TERMINATION_GRACE_PERIOD | How long to wait for a pod to terminate in seconds; 0 means "delete immediately"; set to a negative value to use the pod's terminationGracePeriodSeconds. | no | `-1` |
| METRICS_PORT | Port to bind metrics server to | no | `8080` |
| METRICS | Expose metrics in Prometheus format at `:${METRICS_PORT}/metrics` | no | `""` |
| SLOW_MODE | If enabled, every time a node is terminated during an execution, the current execution will stop rather than continuing to the next ASG | no | `false` |
| Environment variable | Description | Required | Default |
|:------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------|:------------|
| CLUSTER_NAME | Name of the eks-cluster, used in place of `AUTODISCOVERRY_TAGS` and `AUTO_SCALING_GROUP_NAMES`. Checks for `k8s.io/cluster-autoscaler/<CLUSTER_NAME>: owned` and `k8s.io/cluster-autoscaler/enabled: true` tags on ASG | yes | `""` |
| AUTODISCOVERY_TAGS | Comma separated key value string with tags to autodiscover ASGs, used in place of `CLUSTER_NAME` and `AUTO_SCALING_GROUP_NAMES`. | yes | `""` |
| AUTO_SCALING_GROUP_NAMES | Comma-separated list of ASGs, CLUSTER_NAME takes priority. | yes | `""` |
| IGNORE_DAEMON_SETS | Whether to ignore DaemonSets when draining the nodes | no | `true` |
| DELETE_EMPTY_DIR_DATA | Whether to delete empty dir data when draining the nodes | no | `true` |
| AWS_REGION | Self-explanatory | no | `us-west-2` |
| ENVIRONMENT | If set to `dev`, will try to create the Kubernetes client using your local kubeconfig. Any other values will use the in-cluster configuration | no | `""` |
| EXECUTION_INTERVAL | Duration to sleep between each execution in seconds | no | `20` |
| EXECUTION_TIMEOUT | Maximum execution duration before timing out in seconds | no | `900` |
| POD_TERMINATION_GRACE_PERIOD | How long to wait for a pod to terminate in seconds; 0 means "delete immediately"; set to a negative value to use the pod's terminationGracePeriodSeconds. | no | `-1` |
| METRICS_PORT | Port to bind metrics server to | no | `8080` |
| METRICS | Expose metrics in Prometheus format at `:${METRICS_PORT}/metrics` | no | `""` |
| SLOW_MODE | If enabled, every time a node is terminated during an execution, the current execution will stop rather than continuing to the next ASG | no | `false` |
| EAGER_CORDONING | If enabled, alle outdated nodes will get cordoned before any rolling update action. The default mode is to cordon a node just before draining it. See [#41](https://github.com/TwiN/aws-eks-asg-rolling-update-handler/issues/41) for possible consequences of enabling this. | no | `false` |

**NOTE:** Only one of `CLUSTER_NAME`, `AUTODISCOVERY_TAGS` or `AUTO_SCALING_GROUP_NAMES` must be set.

Expand Down
9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
EnvMetrics = "METRICS"
EnvMetricsPort = "METRICS_PORT"
EnvSlowMode = "SLOW_MODE"
EnvEagerCordoning = "EAGER_CORDONING"
)

type config struct {
Expand All @@ -43,14 +44,16 @@ type config struct {
Metrics bool // Defaults to false
MetricsPort int // Defaults to 8080
SlowMode bool // Defaults to false
EagerCordoning bool // Defaults to false
}

// Initialize is used to initialize the application's configuration
func Initialize() error {
cfg = &config{
Environment: strings.ToLower(os.Getenv(EnvEnvironment)),
Debug: strings.ToLower(os.Getenv(EnvDebug)) == "true",
SlowMode: strings.ToLower(os.Getenv(EnvSlowMode)) == "true",
Environment: strings.ToLower(os.Getenv(EnvEnvironment)),
Debug: strings.ToLower(os.Getenv(EnvDebug)) == "true",
SlowMode: strings.ToLower(os.Getenv(EnvSlowMode)) == "true",
EagerCordoning: strings.ToLower(os.Getenv(EnvEagerCordoning)) == "true",
}
if clusterName := os.Getenv(EnvClusterName); len(clusterName) > 0 {
cfg.AutodiscoveryTags = fmt.Sprintf("k8s.io/cluster-autoscaler/%s=owned,k8s.io/cluster-autoscaler/enabled=true", clusterName)
Expand Down
23 changes: 19 additions & 4 deletions k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

const (
AnnotationRollingUpdateStartedTimestamp = "aws-eks-asg-rolling-update-handler.twin.sh/started-at"
AnnotationRollingUpdateCordonedTimestamp = "aws-eks-asg-rolling-update-handler.twin.sh/cordoned-at"
AnnotationRollingUpdateDrainedTimestamp = "aws-eks-asg-rolling-update-handler.twin.sh/drained-at"
AnnotationRollingUpdateTerminatedTimestamp = "aws-eks-asg-rolling-update-handler.twin.sh/terminated-at"

Expand All @@ -33,6 +34,7 @@ type ClientAPI interface {
GetNodeByAutoScalingInstance(instance *autoscaling.Instance) (*v1.Node, error)
FilterNodeByAutoScalingInstance(nodes []v1.Node, instance *autoscaling.Instance) (*v1.Node, error)
UpdateNode(node *v1.Node) error
Cordon(nodeName string) error
Drain(nodeName string, ignoreDaemonSets, deleteEmptyDirData bool, podTerminationGracePeriod int) error
}

Expand Down Expand Up @@ -108,6 +110,23 @@ func (k *Client) UpdateNode(node *v1.Node) error {
return err
}

// Cordon disables scheduling new pods onto the given node
func (k *Client) Cordon(nodeName string) error {
node, err := k.client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
return err
}
drainer := &drain.Helper{
Client: k.client,
Ctx: context.TODO(),
}
if err := drain.RunCordonOrUncordon(drainer, node, true); err != nil {
log.Printf("[%s][CORDONER] Failed to cordon node: %v", node.Name, err)
return err
}
return nil
}

// Drain gracefully deletes all pods from a given node
func (k *Client) Drain(nodeName string, ignoreDaemonSets, deleteEmptyDirData bool, podTerminationGracePeriod int) error {
node, err := k.client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
Expand All @@ -128,10 +147,6 @@ func (k *Client) Drain(nodeName string, ignoreDaemonSets, deleteEmptyDirData boo
log.Printf("[%s][DRAINER] evicted pod %s/%s", nodeName, pod.Namespace, pod.Name)
},
}
if err := drain.RunCordonOrUncordon(drainer, node, true); err != nil {
log.Printf("[%s][DRAINER] Failed to cordon node: %v", node.Name, err)
return err
}
if err := drain.RunNodeDrain(drainer, node.Name); err != nil {
log.Printf("[%s][DRAINER] Failed to drain node: %v", node.Name, err)
return err
Expand Down
6 changes: 4 additions & 2 deletions k8s/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
func TestClient_Drain(t *testing.T) {
fakeKubernetesClient := fakekubernetes.NewSimpleClientset(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "default"}})
kc := NewClient(fakeKubernetesClient)
err := kc.Drain("default", true, true, -1)
if err != nil {
if err := kc.Cordon("default"); err != nil {
t.Errorf("Unexpected error: %v", err)
}
if err := kc.Drain("default", true, true, -1); err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
5 changes: 5 additions & 0 deletions k8stest/k8stest.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (mock *MockClient) UpdateNode(node *v1.Node) error {
return nil
}

func (mock *MockClient) Cordon(nodeName string) error {
mock.Counter["Cordon"]++
return nil
}

func (mock *MockClient) Drain(nodeName string, ignoreDaemonSets, deleteLocalData bool, podTerminationGracePeriod int) error {
mock.Counter["Drain"]++
return nil
Expand Down
53 changes: 51 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,39 @@ func DoHandleRollingUpgrade(client k8s.ClientAPI, ec2Service ec2iface.EC2API, au
rand.Shuffle(len(outdatedInstances), func(i, j int) {
outdatedInstances[i], outdatedInstances[j] = outdatedInstances[j], outdatedInstances[i]
})

if config.Get().EagerCordoning {
for _, outdatedInstance := range outdatedInstances {
node, err := client.GetNodeByAutoScalingInstance(outdatedInstance)
if err != nil {
log.Printf("[%s][%s] Skipping because unable to get outdated node from Kubernetes: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error())
continue
}

_, minutesSinceCordoned, _, _ := getRollingUpdateTimestampsFromNode(node)
if minutesSinceCordoned == -1 {
log.Printf("[%s][%s] Cordoning node", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId))
if err := client.Cordon(node.Name); err != nil {
metrics.Server.Errors.Inc()
log.Printf("[%s][%s] Skipping because ran into error while cordoning node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error())
continue
}

if err := k8s.AnnotateNodeByAutoScalingInstance(client, outdatedInstance, k8s.AnnotationRollingUpdateCordonedTimestamp, time.Now().Format(time.RFC3339)); err != nil {
log.Printf("[%s][%s] Skipping because unable to annotate node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error())
continue
}
}
}
}

for _, outdatedInstance := range outdatedInstances {
node, err := client.GetNodeByAutoScalingInstance(outdatedInstance)
if err != nil {
log.Printf("[%s][%s] Skipping because unable to get outdated node from Kubernetes: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error())
continue
}
minutesSinceStarted, minutesSinceDrained, minutesSinceTerminated := getRollingUpdateTimestampsFromNode(node)
minutesSinceStarted, minutesSinceCordoned, minutesSinceDrained, minutesSinceTerminated := getRollingUpdateTimestampsFromNode(node)
// Check if outdated nodes in k8s have been marked with annotation from aws-eks-asg-rolling-update-handler
if minutesSinceStarted == -1 {
log.Printf("[%s][%s] Starting node rollout process", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId))
Expand All @@ -173,6 +199,20 @@ func DoHandleRollingUpgrade(client k8s.ClientAPI, ec2Service ec2iface.EC2API, au
hasEnoughResources := k8s.CheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(client, node, updatedReadyNodes)
if hasEnoughResources {
log.Printf("[%s][%s] Updated nodes have enough resources available", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId))
if minutesSinceCordoned == -1 {
log.Printf("[%s][%s] Cordoning node", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId))
err := client.Cordon(node.Name)
if err != nil {
metrics.Server.Errors.Inc()
log.Printf("[%s][%s] Skipping because ran into error while cordoning node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error())
continue
} else {
// Only annotate if no error was encountered
_ = k8s.AnnotateNodeByAutoScalingInstance(client, outdatedInstance, k8s.AnnotationRollingUpdateCordonedTimestamp, time.Now().Format(time.RFC3339))
}
} else {
log.Printf("[%s][%s] Node has already been cordoned %d minutes ago, skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), minutesSinceCordoned)
}
if minutesSinceDrained == -1 {
log.Printf("[%s][%s] Draining node", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId))
err := client.Drain(node.Name, config.Get().IgnoreDaemonSets, config.Get().DeleteEmptyDirData, config.Get().PodTerminationGracePeriod)
Expand Down Expand Up @@ -312,7 +352,7 @@ func getReadyNodesAndNumberOfNonReadyNodesOrInstances(client k8s.ClientAPI, upda
return updatedReadyNodes, numberOfNonReadyNodesOrInstances
}

func getRollingUpdateTimestampsFromNode(node *v1.Node) (minutesSinceStarted int, minutesSinceDrained int, minutesSinceTerminated int) {
func getRollingUpdateTimestampsFromNode(node *v1.Node) (minutesSinceStarted int, minutesSinceCordoned int, minutesSinceDrained int, minutesSinceTerminated int) {
rollingUpdateStartedAt, ok := node.Annotations[k8s.AnnotationRollingUpdateStartedTimestamp]
if ok {
startedAt, err := time.Parse(time.RFC3339, rollingUpdateStartedAt)
Expand All @@ -322,6 +362,15 @@ func getRollingUpdateTimestampsFromNode(node *v1.Node) (minutesSinceStarted int,
} else {
minutesSinceStarted = -1
}
cordonedAtValue, ok := node.Annotations[k8s.AnnotationRollingUpdateCordonedTimestamp]
if ok {
cordonedAt, err := time.Parse(time.RFC3339, cordonedAtValue)
if err == nil {
minutesSinceCordoned = int(time.Since(cordonedAt).Minutes())
}
} else {
minutesSinceCordoned = -1
}
drainedAtValue, ok := node.Annotations[k8s.AnnotationRollingUpdateDrainedTimestamp]
if ok {
drainedAt, err := time.Parse(time.RFC3339, drainedAtValue)
Expand Down

0 comments on commit bb18d4b

Please sign in to comment.