Skip to content

Commit

Permalink
eks/mng: improve upgrade polling
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
  • Loading branch information
gyuho committed Jun 27, 2020
1 parent 333f082 commit 4019d6d
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-1.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ See [code changes](https://github.com/aws/aws-k8s-tester/compare/v1.3.9...v1.4.0

### `eks`

- Run [query function while checking `eks/mng` version upgrades](https://github.com/aws/aws-k8s-tester/commit/).
- Improve and clean up [`eks/irsa`](https://github.com/aws/aws-k8s-tester/commit/12bf8c74cab92df3877606347cf5748ff8d3b89b).
- Add [`clusterloader --provider=eks` flag to `eks/cluster-loader`](https://github.com/aws/aws-k8s-tester/commit/dc406f03528902a318dabac10e824c3c06e2dd06).
- Add [`eks/cluster-loader` `CL2UseHostNetworkPods` support](https://github.com/aws/aws-k8s-tester/commit/23310e17d172491c44158a7d07290e2d172e5fdc).
Expand Down
51 changes: 45 additions & 6 deletions eks/cluster/wait/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,17 @@ func Poll(
clusterName string,
desiredClusterStatus string,
initialWait time.Duration,
wait time.Duration,
) <-chan ClusterStatus {
interval time.Duration,
opts ...OpOption) <-chan ClusterStatus {

ret := Op{}
ret.applyOpts(opts)

lg.Info("polling cluster",
zap.String("cluster-name", clusterName),
zap.String("desired-status", desiredClusterStatus),
zap.String("initial-wait", initialWait.String()),
zap.String("interval", interval.String()),
)

now := time.Now()
Expand Down Expand Up @@ -85,7 +91,7 @@ func Poll(
// in case stack has already reached desired status
// wait from second interation
if waitDur == time.Duration(0) {
waitDur = wait
waitDur = interval
}
}

Expand Down Expand Up @@ -138,6 +144,10 @@ func Poll(
ch <- ClusterStatus{Cluster: cluster, Error: nil}
}

if ret.queryFunc != nil {
ret.queryFunc()
}

if first {
lg.Info("sleeping", zap.Duration("initial-wait", initialWait))
select {
Expand Down Expand Up @@ -198,12 +208,18 @@ func PollUpdate(
requestID string,
desiredUpdateStatus string,
initialWait time.Duration,
wait time.Duration,
) <-chan UpdateStatus {
interval time.Duration,
opts ...OpOption) <-chan UpdateStatus {

ret := Op{}
ret.applyOpts(opts)

lg.Info("polling cluster update",
zap.String("cluster-name", clusterName),
zap.String("request-id", requestID),
zap.String("desired-update-status", desiredUpdateStatus),
zap.String("initial-wait", initialWait.String()),
zap.String("interval", interval.String()),
)

now := time.Now()
Expand Down Expand Up @@ -235,7 +251,7 @@ func PollUpdate(
// in case stack has already reached desired status
// wait from second interation
if waitDur == time.Duration(0) {
waitDur = wait
waitDur = interval
}
}

Expand Down Expand Up @@ -291,6 +307,10 @@ func PollUpdate(
ch <- UpdateStatus{Update: update, Error: nil}
}

if ret.queryFunc != nil {
ret.queryFunc()
}

if first {
lg.Info("sleeping", zap.Duration("initial-wait", initialWait))
select {
Expand All @@ -317,3 +337,22 @@ func PollUpdate(
}()
return ch
}

// Op represents a MNG operation.
type Op struct {
queryFunc func()
}

// OpOption configures archiver operations.
type OpOption func(*Op)

// WithQueryFunc configures query function to be called in retry func.
func WithQueryFunc(f func()) OpOption {
return func(op *Op) { op.queryFunc = f }
}

func (op *Op) applyOpts(opts []OpOption) {
for _, opt := range opts {
opt(op)
}
}
2 changes: 1 addition & 1 deletion eks/hollow-nodes/hollow-nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (ng *nodeGroup) checkNodes() (readyNodes []string, createdNodes []string, e
}
ng.cfg.Logger.Info("node is ready!",
zap.String("name", nodeName),
zap.String("type", fmt.Sprintf("%s", cond.Type)),
zap.String("status-type", fmt.Sprintf("%s", cond.Type)),
zap.String("status", fmt.Sprintf("%s", cond.Status)),
)
readyNodes = append(readyNodes, nodeName)
Expand Down
2 changes: 1 addition & 1 deletion eks/hollow-nodes/remote/hollow-nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (ts *tester) checkNodes() error {
}
ts.cfg.Logger.Info("node is ready!",
zap.String("name", nodeName),
zap.String("type", fmt.Sprintf("%s", cond.Type)),
zap.String("status-type", fmt.Sprintf("%s", cond.Type)),
zap.String("status", fmt.Sprintf("%s", cond.Status)),
)
createdNodeNames = append(createdNodeNames, nodeName)
Expand Down
8 changes: 5 additions & 3 deletions eks/mng/mng.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ func (ts *tester) UpgradeVersion() (err error) {
ts.cfg.Logger.Info("ManagedNodeGroup is not created; skipping upgrade")
return nil
}

for _, cur := range ts.cfg.EKSConfig.AddOnManagedNodeGroups.MNGs {
if err = ts.versionUpgrader.Upgrade(cur.Name); err != nil {
for mngName := range ts.cfg.EKSConfig.AddOnManagedNodeGroups.MNGs {
if err = ts.versionUpgrader.Upgrade(mngName); err != nil {
return err
}
if err = ts.nodeWaiter.Wait(mngName, 3); err != nil {
return err
}
}
Expand Down
28 changes: 28 additions & 0 deletions eks/mng/version-upgrade/version-upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
aws_eks "github.com/aws/aws-sdk-go/service/eks"
"github.com/aws/aws-sdk-go/service/eks/eksiface"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
)

// Upgrader defines MNG version upgrade interface.
Expand Down Expand Up @@ -139,6 +140,33 @@ func (ts *tester) Upgrade(mngName string) (err error) {
eks.UpdateStatusSuccessful,
initialWait,
30*time.Second,
wait.WithQueryFunc(func() {
println()
ts.cfg.Logger.Info("listing nodes while polling mng update status", zap.String("mng-name", mngName))
nodes, err := ts.cfg.K8SClient.ListNodes(150, 5*time.Second)
if err != nil {
ts.cfg.Logger.Warn("failed to list nodes while polling mng update status", zap.Error(err))
return
}
for _, node := range nodes {
labels := node.GetLabels()
if labels["NGName"] != mngName {
continue
}
for _, cond := range node.Status.Conditions {
if cond.Status != v1.ConditionTrue {
continue
}
ts.cfg.Logger.Info("node",
zap.String("name", node.GetName()),
zap.String("mng-name", mngName),
zap.String("status-type", fmt.Sprintf("%s", cond.Type)),
zap.String("status", fmt.Sprintf("%s", cond.Status)),
)
break
}
}
}),
)
for v := range updateCh {
err = v.Error
Expand Down
51 changes: 45 additions & 6 deletions eks/mng/wait/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ func Poll(
mngName string,
desiredNodeGroupStatus string,
initialWait time.Duration,
wait time.Duration,
) <-chan ManagedNodeGroupStatus {
interval time.Duration,
opts ...OpOption) <-chan ManagedNodeGroupStatus {

ret := Op{}
ret.applyOpts(opts)

lg.Info("polling mng",
zap.String("cluster-name", clusterName),
zap.String("mng-name", mngName),
zap.String("desired-status", desiredNodeGroupStatus),
zap.String("initial-wait", initialWait.String()),
zap.String("interval", interval.String()),
)

now := time.Now()
Expand Down Expand Up @@ -83,7 +89,7 @@ func Poll(
// in case stack has already reached desired status
// wait from second interation
if waitDur == time.Duration(0) {
waitDur = wait
waitDur = interval
}
}

Expand Down Expand Up @@ -142,6 +148,10 @@ func Poll(
ch <- ManagedNodeGroupStatus{NodeGroupName: mngName, NodeGroup: nodeGroup, Error: nil}
}

if ret.queryFunc != nil {
ret.queryFunc()
}

if first {
lg.Info("sleeping", zap.Duration("initial-wait", initialWait))
select {
Expand Down Expand Up @@ -219,13 +229,19 @@ func PollUpdate(
requestID string,
desiredUpdateStatus string,
initialWait time.Duration,
wait time.Duration,
) <-chan UpdateStatus {
interval time.Duration,
opts ...OpOption) <-chan UpdateStatus {

ret := Op{}
ret.applyOpts(opts)

lg.Info("polling mng update",
zap.String("cluster-name", clusterName),
zap.String("mng-name", mngName),
zap.String("request-id", requestID),
zap.String("desired-update-status", desiredUpdateStatus),
zap.String("initial-wait", initialWait.String()),
zap.String("interval", interval.String()),
)

now := time.Now()
Expand Down Expand Up @@ -257,7 +273,7 @@ func PollUpdate(
// in case stack has already reached desired status
// wait from second interation
if waitDur == time.Duration(0) {
waitDur = wait
waitDur = interval
}
}

Expand Down Expand Up @@ -315,6 +331,10 @@ func PollUpdate(
ch <- UpdateStatus{Update: update, Error: nil}
}

if ret.queryFunc != nil {
ret.queryFunc()
}

if first {
lg.Info("sleeping", zap.Duration("initial-wait", initialWait))
select {
Expand All @@ -341,3 +361,22 @@ func PollUpdate(
}()
return ch
}

// Op represents a MNG operation.
type Op struct {
queryFunc func()
}

// OpOption configures archiver operations.
type OpOption func(*Op)

// WithQueryFunc configures query function to be called in retry func.
func WithQueryFunc(f func()) OpOption {
return func(op *Op) { op.queryFunc = f }
}

func (op *Op) applyOpts(opts []OpOption) {
for _, opt := range opts {
opt(op)
}
}
2 changes: 1 addition & 1 deletion eks/mng/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (ts *tester) waitForNodes(mngName string, retriesLeft int) error {
}
ts.cfg.Logger.Info("node is ready!",
zap.String("name", nodeName),
zap.String("type", fmt.Sprintf("%s", cond.Type)),
zap.String("status-type", fmt.Sprintf("%s", cond.Type)),
zap.String("status", fmt.Sprintf("%s", cond.Status)),
)
readies++
Expand Down
2 changes: 1 addition & 1 deletion eks/ng/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (ts *tester) waitForNodes(asgName string, retriesLeft int) error {
}
ts.cfg.Logger.Info("node is ready!",
zap.String("name", nodeName),
zap.String("type", fmt.Sprintf("%s", cond.Type)),
zap.String("status-type", fmt.Sprintf("%s", cond.Type)),
zap.String("status", fmt.Sprintf("%s", cond.Status)),
)
readies++
Expand Down
3 changes: 2 additions & 1 deletion pkg/k8s-client/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ func waitForJobCompletes(
jobName string,
target int,
opts ...OpOption) (job *batchv1.Job, cronJob *batchv1beta1.CronJob, pods []apiv1.Pod, err error) {

ret := Op{}
ret.applyOpts(opts)

Expand Down Expand Up @@ -707,7 +708,7 @@ func waitForJobCompletes(
return job, cronJob, pods, err
}

// Op represents a SSH operation.
// Op represents a Kubernetes client operation.
type Op struct {
queryFunc func()
forceDelete bool
Expand Down

0 comments on commit 4019d6d

Please sign in to comment.