diff --git a/pkg/cfn/manager/waiters.go b/pkg/cfn/manager/waiters.go index 0c07125242..df477e9cdc 100644 --- a/pkg/cfn/manager/waiters.go +++ b/pkg/cfn/manager/waiters.go @@ -1,17 +1,15 @@ package manager import ( - "context" "fmt" - "math/rand" - "strings" - "time" "github.com/aws/aws-sdk-go/aws/request" cfn "github.com/aws/aws-sdk-go/service/cloudformation" "github.com/kris-nova/logger" "github.com/pkg/errors" + "github.com/weaveworks/eksctl/pkg/cfn/builder" + "github.com/weaveworks/eksctl/pkg/utils/waiters" ) const ( @@ -23,80 +21,21 @@ const ( // so this is custom version that is more suitable for our use, as there is no way to add any // custom acceptors -func makeStatusAcceptor(status string, statusPath string) request.WaiterAcceptor { - return request.WaiterAcceptor{ - Matcher: request.PathAllWaiterMatch, - Argument: statusPath, - Expected: status, - State: request.FailureWaiterState, - } -} - -func makeAcceptors(statusPath string, successStatus string, failureStates []string, extraAcceptors ...request.WaiterAcceptor) []request.WaiterAcceptor { - acceptors := []request.WaiterAcceptor{makeStatusAcceptor(successStatus, statusPath)} - acceptors[0].State = request.SuccessWaiterState - - for _, s := range failureStates { - acceptors = append(acceptors, makeStatusAcceptor(s, statusPath)) - } - - acceptors = append(acceptors, extraAcceptors...) - - return acceptors -} - -// makeWaiterDelay returns delay ranging between 15s and 20s -func makeWaiterDelay() request.WaiterDelay { - const ( - base = 15 * time.Second - offsetSteps = 200 - offsetMax = 5000 - stepMult = offsetMax / offsetSteps - ) - - offsets := rand.Perm(offsetSteps) - - return func(attempt int) time.Duration { - s := rand.Intn(offsetSteps) - d := stepMult * offsets[s] - - offset := time.Duration(d) * time.Millisecond - - return base + offset - } -} - func (c *StackCollection) waitWithAcceptors(i *Stack, acceptors []request.WaiterAcceptor) error { - desiredStatus := fmt.Sprintf("%v", acceptors[0].Expected) - msg := fmt.Sprintf("waiting for CloudFormation stack %q to reach %q status", *i.StackName, desiredStatus) - - ctx, cancel := context.WithTimeout(context.Background(), c.provider.WaitTimeout()) - defer cancel() - - startTime := time.Now() + msg := fmt.Sprintf("waiting for CloudFormation stack %q", *i.StackName) - w := request.Waiter{ - Name: strings.Join([]string{"wait", *i.StackName, desiredStatus}, "_"), - MaxAttempts: 1024, // we use context deadline instead - Delay: makeWaiterDelay(), - Acceptors: acceptors, - NewRequest: func(_ []request.Option) (*request.Request, error) { - input := &cfn.DescribeStacksInput{ - StackName: i.StackName, - } - if i.StackId != nil && *i.StackId != "" { - input.StackName = i.StackId - } - logger.Debug(msg) - req, _ := c.provider.CloudFormation().DescribeStacksRequest(input) - req.SetContext(ctx) - return req, nil - }, + newRequest := func() *request.Request { + input := &cfn.DescribeStacksInput{ + StackName: i.StackName, + } + if i.StackId != nil && *i.StackId != "" { + input.StackName = i.StackId + } + req, _ := c.provider.CloudFormation().DescribeStacksRequest(input) + return req } - logger.Debug("start %s", msg) - - if waitErr := w.WaitWithContext(ctx); waitErr != nil { + troubleshoot := func(desiredStatus string) { s, err := c.describeStack(i) if err != nil { logger.Debug("describeErr=%v", err) @@ -104,48 +43,33 @@ func (c *StackCollection) waitWithAcceptors(i *Stack, acceptors []request.Waiter logger.Critical("unexpected status %q while %s", *s.StackStatus, msg) c.troubleshootStackFailureCause(i, desiredStatus) } - return errors.Wrap(waitErr, msg) } - logger.Debug("done after %s of %s", time.Since(startTime), msg) - - return nil + return waiters.Wait(*i.StackName, msg, acceptors, newRequest, c.provider.WaitTimeout(), troubleshoot) } func (c *StackCollection) waitWithAcceptorsChangeSet(i *Stack, changesetName *string, acceptors []request.WaiterAcceptor) error { - desiredStatus := fmt.Sprintf("%v", acceptors[0].Expected) - msg := fmt.Sprintf("waiting for CloudFormation changeset %q for stack %q to reach %q status", *changesetName, *i.StackName, desiredStatus) - ctx, cancel := context.WithTimeout(context.Background(), c.provider.WaitTimeout()) - defer cancel() - startTime := time.Now() - w := request.Waiter{ - Name: strings.Join([]string{"waitCS", *i.StackName, *changesetName, desiredStatus}, "_"), - MaxAttempts: 1024, // we use context deadline instead - Delay: makeWaiterDelay(), - Acceptors: acceptors, - NewRequest: func(_ []request.Option) (*request.Request, error) { - input := &cfn.DescribeChangeSetInput{ - StackName: i.StackName, - ChangeSetName: changesetName, - } - logger.Debug(msg) - req, _ := c.provider.CloudFormation().DescribeChangeSetRequest(input) - req.SetContext(ctx) - return req, nil - }, + msg := fmt.Sprintf("waiting for CloudFormation changeset %q for stack %q", *changesetName, *i.StackName) + + newRequest := func() *request.Request { + input := &cfn.DescribeChangeSetInput{ + StackName: i.StackName, + ChangeSetName: changesetName, + } + req, _ := c.provider.CloudFormation().DescribeChangeSetRequest(input) + return req } - logger.Debug("start %s", msg) - if waitErr := w.WaitWithContext(ctx); waitErr != nil { + + troubleshoot := func(desiredStatus string) { s, err := c.describeStackChangeSet(i, changesetName) if err != nil { logger.Debug("describeChangeSetErr=%v", err) } else { logger.Critical("unexpected status %q while %s, reason %s", *s.Status, msg, *s.StatusReason) } - return errors.Wrap(waitErr, msg) } - logger.Debug("done after %s of %s", time.Since(startTime), msg) - return nil + + return waiters.Wait(*i.StackName, msg, acceptors, newRequest, c.provider.WaitTimeout(), troubleshoot) } func (c *StackCollection) troubleshootStackFailureCause(i *Stack, desiredStatus string) { @@ -187,7 +111,7 @@ func (c *StackCollection) troubleshootStackFailureCause(i *Stack, desiredStatus func (c *StackCollection) doWaitUntilStackIsCreated(i *Stack) error { return c.waitWithAcceptors(i, - makeAcceptors( + waiters.MakeAcceptors( stackStatus, cfn.StackStatusCreateComplete, []string{ @@ -229,7 +153,7 @@ func (c *StackCollection) waitUntilStackIsCreated(i *Stack, stack builder.Resour func (c *StackCollection) doWaitUntilStackIsDeleted(i *Stack) error { return c.waitWithAcceptors(i, - makeAcceptors( + waiters.MakeAcceptors( stackStatus, cfn.StackStatusDeleteComplete, []string{ @@ -273,7 +197,7 @@ func (c *StackCollection) waitUntilStackIsDeleted(i *Stack, errs chan error) { func (c *StackCollection) doWaitUntilStackIsUpdated(i *Stack) error { return c.waitWithAcceptors(i, - makeAcceptors( + waiters.MakeAcceptors( stackStatus, cfn.StackStatusUpdateComplete, []string{ @@ -298,7 +222,7 @@ func (c *StackCollection) doWaitUntilStackIsUpdated(i *Stack) error { func (c *StackCollection) doWaitUntilChangeSetIsCreated(i *Stack, changesetName *string) error { return c.waitWithAcceptorsChangeSet(i, changesetName, - makeAcceptors( + waiters.MakeAcceptors( changesetStatus, cfn.ChangeSetStatusCreateComplete, []string{ diff --git a/pkg/utils/waiters/waiters.go b/pkg/utils/waiters/waiters.go new file mode 100644 index 0000000000..1e19070ec8 --- /dev/null +++ b/pkg/utils/waiters/waiters.go @@ -0,0 +1,95 @@ +package waiters + +import ( + "context" + "fmt" + "math/rand" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws/request" + "github.com/kris-nova/logger" + "github.com/pkg/errors" +) + +// Wait for something with a name to reach status that is expressed by acceptors using newRequest +// until we hit waitTimeout, on unexpected status troubleshoot will be called with the desired +// status as an argument, so that it can find what migth have gone wrong +func Wait(name, msg string, acceptors []request.WaiterAcceptor, newRequest func() *request.Request, waitTimeout time.Duration, troubleshoot func(string)) error { + desiredStatus := fmt.Sprintf("%v", acceptors[0].Expected) + msg = fmt.Sprintf("%s to reach %q status", msg, desiredStatus) + name = strings.Join([]string{"wait", name, desiredStatus}, "_") + + ctx, cancel := context.WithTimeout(context.Background(), waitTimeout) + defer cancel() + startTime := time.Now() + w := makeWaiter(ctx, name, msg, acceptors, newRequest) + logger.Debug("start %s", msg) + if waitErr := w.WaitWithContext(ctx); waitErr != nil { + if troubleshoot != nil { + troubleshoot(desiredStatus) + } + return errors.Wrap(waitErr, msg) + } + logger.Debug("done after %s of %s", time.Since(startTime), msg) + return nil +} + +func makeWaiter(ctx context.Context, name, msg string, acceptors []request.WaiterAcceptor, newRequest func() *request.Request) request.Waiter { + return request.Waiter{ + Name: name, + MaxAttempts: 1024, // we use context deadline instead + Delay: makeWaiterDelay(), + Acceptors: acceptors, + NewRequest: func(_ []request.Option) (*request.Request, error) { + logger.Debug(msg) + req := newRequest() + req.SetContext(ctx) + return req, nil + }, + } +} + +// MakeAcceptors constructs a slice of request acceptors +func MakeAcceptors(statusPath string, successStatus string, failureStates []string, extraAcceptors ...request.WaiterAcceptor) []request.WaiterAcceptor { + acceptors := []request.WaiterAcceptor{makeStatusAcceptor(successStatus, statusPath)} + acceptors[0].State = request.SuccessWaiterState + + for _, s := range failureStates { + acceptors = append(acceptors, makeStatusAcceptor(s, statusPath)) + } + + acceptors = append(acceptors, extraAcceptors...) + + return acceptors +} + +func makeStatusAcceptor(status string, statusPath string) request.WaiterAcceptor { + return request.WaiterAcceptor{ + Matcher: request.PathAllWaiterMatch, + Argument: statusPath, + Expected: status, + State: request.FailureWaiterState, + } +} + +// makeWaiterDelay returns delay ranging between 15s and 20s +func makeWaiterDelay() request.WaiterDelay { + const ( + base = 15 * time.Second + offsetSteps = 200 + offsetMax = 5000 + stepMult = offsetMax / offsetSteps + ) + + offsets := rand.Perm(offsetSteps) + + return func(attempt int) time.Duration { + s := rand.Intn(offsetSteps) + d := stepMult * offsets[s] + + offset := time.Duration(d) * time.Millisecond + + return base + offset + } +}