Skip to content

Commit

Permalink
Merge pull request #606 from weaveworks/waiters-pkg
Browse files Browse the repository at this point in the history
Refactor waiter helpers into a separate package
  • Loading branch information
errordeveloper authored Mar 4, 2019
2 parents c8d6a50 + 9192d9f commit 7672842
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 107 deletions.
138 changes: 31 additions & 107 deletions pkg/cfn/manager/waiters.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package manager

import (
"context"
"fmt"
"math/rand"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/request"
cfn "github.com/aws/aws-sdk-go/service/cloudformation"
"github.com/kris-nova/logger"
"github.com/pkg/errors"

"github.com/weaveworks/eksctl/pkg/cfn/builder"
"github.com/weaveworks/eksctl/pkg/utils/waiters"
)

const (
Expand All @@ -23,129 +21,55 @@ const (
// so this is custom version that is more suitable for our use, as there is no way to add any
// custom acceptors

func makeStatusAcceptor(status string, statusPath string) request.WaiterAcceptor {
return request.WaiterAcceptor{
Matcher: request.PathAllWaiterMatch,
Argument: statusPath,
Expected: status,
State: request.FailureWaiterState,
}
}

func makeAcceptors(statusPath string, successStatus string, failureStates []string, extraAcceptors ...request.WaiterAcceptor) []request.WaiterAcceptor {
acceptors := []request.WaiterAcceptor{makeStatusAcceptor(successStatus, statusPath)}
acceptors[0].State = request.SuccessWaiterState

for _, s := range failureStates {
acceptors = append(acceptors, makeStatusAcceptor(s, statusPath))
}

acceptors = append(acceptors, extraAcceptors...)

return acceptors
}

// makeWaiterDelay returns delay ranging between 15s and 20s
func makeWaiterDelay() request.WaiterDelay {
const (
base = 15 * time.Second
offsetSteps = 200
offsetMax = 5000
stepMult = offsetMax / offsetSteps
)

offsets := rand.Perm(offsetSteps)

return func(attempt int) time.Duration {
s := rand.Intn(offsetSteps)
d := stepMult * offsets[s]

offset := time.Duration(d) * time.Millisecond

return base + offset
}
}

func (c *StackCollection) waitWithAcceptors(i *Stack, acceptors []request.WaiterAcceptor) error {
desiredStatus := fmt.Sprintf("%v", acceptors[0].Expected)
msg := fmt.Sprintf("waiting for CloudFormation stack %q to reach %q status", *i.StackName, desiredStatus)

ctx, cancel := context.WithTimeout(context.Background(), c.provider.WaitTimeout())
defer cancel()

startTime := time.Now()
msg := fmt.Sprintf("waiting for CloudFormation stack %q", *i.StackName)

w := request.Waiter{
Name: strings.Join([]string{"wait", *i.StackName, desiredStatus}, "_"),
MaxAttempts: 1024, // we use context deadline instead
Delay: makeWaiterDelay(),
Acceptors: acceptors,
NewRequest: func(_ []request.Option) (*request.Request, error) {
input := &cfn.DescribeStacksInput{
StackName: i.StackName,
}
if i.StackId != nil && *i.StackId != "" {
input.StackName = i.StackId
}
logger.Debug(msg)
req, _ := c.provider.CloudFormation().DescribeStacksRequest(input)
req.SetContext(ctx)
return req, nil
},
newRequest := func() *request.Request {
input := &cfn.DescribeStacksInput{
StackName: i.StackName,
}
if i.StackId != nil && *i.StackId != "" {
input.StackName = i.StackId
}
req, _ := c.provider.CloudFormation().DescribeStacksRequest(input)
return req
}

logger.Debug("start %s", msg)

if waitErr := w.WaitWithContext(ctx); waitErr != nil {
troubleshoot := func(desiredStatus string) {
s, err := c.describeStack(i)
if err != nil {
logger.Debug("describeErr=%v", err)
} else {
logger.Critical("unexpected status %q while %s", *s.StackStatus, msg)
c.troubleshootStackFailureCause(i, desiredStatus)
}
return errors.Wrap(waitErr, msg)
}

logger.Debug("done after %s of %s", time.Since(startTime), msg)

return nil
return waiters.Wait(*i.StackName, msg, acceptors, newRequest, c.provider.WaitTimeout(), troubleshoot)
}

func (c *StackCollection) waitWithAcceptorsChangeSet(i *Stack, changesetName *string, acceptors []request.WaiterAcceptor) error {
desiredStatus := fmt.Sprintf("%v", acceptors[0].Expected)
msg := fmt.Sprintf("waiting for CloudFormation changeset %q for stack %q to reach %q status", *changesetName, *i.StackName, desiredStatus)
ctx, cancel := context.WithTimeout(context.Background(), c.provider.WaitTimeout())
defer cancel()
startTime := time.Now()
w := request.Waiter{
Name: strings.Join([]string{"waitCS", *i.StackName, *changesetName, desiredStatus}, "_"),
MaxAttempts: 1024, // we use context deadline instead
Delay: makeWaiterDelay(),
Acceptors: acceptors,
NewRequest: func(_ []request.Option) (*request.Request, error) {
input := &cfn.DescribeChangeSetInput{
StackName: i.StackName,
ChangeSetName: changesetName,
}
logger.Debug(msg)
req, _ := c.provider.CloudFormation().DescribeChangeSetRequest(input)
req.SetContext(ctx)
return req, nil
},
msg := fmt.Sprintf("waiting for CloudFormation changeset %q for stack %q", *changesetName, *i.StackName)

newRequest := func() *request.Request {
input := &cfn.DescribeChangeSetInput{
StackName: i.StackName,
ChangeSetName: changesetName,
}
req, _ := c.provider.CloudFormation().DescribeChangeSetRequest(input)
return req
}
logger.Debug("start %s", msg)
if waitErr := w.WaitWithContext(ctx); waitErr != nil {

troubleshoot := func(desiredStatus string) {
s, err := c.describeStackChangeSet(i, changesetName)
if err != nil {
logger.Debug("describeChangeSetErr=%v", err)
} else {
logger.Critical("unexpected status %q while %s, reason %s", *s.Status, msg, *s.StatusReason)
}
return errors.Wrap(waitErr, msg)
}
logger.Debug("done after %s of %s", time.Since(startTime), msg)
return nil

return waiters.Wait(*i.StackName, msg, acceptors, newRequest, c.provider.WaitTimeout(), troubleshoot)
}

func (c *StackCollection) troubleshootStackFailureCause(i *Stack, desiredStatus string) {
Expand Down Expand Up @@ -187,7 +111,7 @@ func (c *StackCollection) troubleshootStackFailureCause(i *Stack, desiredStatus

func (c *StackCollection) doWaitUntilStackIsCreated(i *Stack) error {
return c.waitWithAcceptors(i,
makeAcceptors(
waiters.MakeAcceptors(
stackStatus,
cfn.StackStatusCreateComplete,
[]string{
Expand Down Expand Up @@ -229,7 +153,7 @@ func (c *StackCollection) waitUntilStackIsCreated(i *Stack, stack builder.Resour

func (c *StackCollection) doWaitUntilStackIsDeleted(i *Stack) error {
return c.waitWithAcceptors(i,
makeAcceptors(
waiters.MakeAcceptors(
stackStatus,
cfn.StackStatusDeleteComplete,
[]string{
Expand Down Expand Up @@ -273,7 +197,7 @@ func (c *StackCollection) waitUntilStackIsDeleted(i *Stack, errs chan error) {

func (c *StackCollection) doWaitUntilStackIsUpdated(i *Stack) error {
return c.waitWithAcceptors(i,
makeAcceptors(
waiters.MakeAcceptors(
stackStatus,
cfn.StackStatusUpdateComplete,
[]string{
Expand All @@ -298,7 +222,7 @@ func (c *StackCollection) doWaitUntilStackIsUpdated(i *Stack) error {

func (c *StackCollection) doWaitUntilChangeSetIsCreated(i *Stack, changesetName *string) error {
return c.waitWithAcceptorsChangeSet(i, changesetName,
makeAcceptors(
waiters.MakeAcceptors(
changesetStatus,
cfn.ChangeSetStatusCreateComplete,
[]string{
Expand Down
95 changes: 95 additions & 0 deletions pkg/utils/waiters/waiters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package waiters

import (
"context"
"fmt"
"math/rand"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/request"
"github.com/kris-nova/logger"
"github.com/pkg/errors"
)

// Wait for something with a name to reach status that is expressed by acceptors using newRequest
// until we hit waitTimeout, on unexpected status troubleshoot will be called with the desired
// status as an argument, so that it can find what migth have gone wrong
func Wait(name, msg string, acceptors []request.WaiterAcceptor, newRequest func() *request.Request, waitTimeout time.Duration, troubleshoot func(string)) error {
desiredStatus := fmt.Sprintf("%v", acceptors[0].Expected)
msg = fmt.Sprintf("%s to reach %q status", msg, desiredStatus)
name = strings.Join([]string{"wait", name, desiredStatus}, "_")

ctx, cancel := context.WithTimeout(context.Background(), waitTimeout)
defer cancel()
startTime := time.Now()
w := makeWaiter(ctx, name, msg, acceptors, newRequest)
logger.Debug("start %s", msg)
if waitErr := w.WaitWithContext(ctx); waitErr != nil {
if troubleshoot != nil {
troubleshoot(desiredStatus)
}
return errors.Wrap(waitErr, msg)
}
logger.Debug("done after %s of %s", time.Since(startTime), msg)
return nil
}

func makeWaiter(ctx context.Context, name, msg string, acceptors []request.WaiterAcceptor, newRequest func() *request.Request) request.Waiter {
return request.Waiter{
Name: name,
MaxAttempts: 1024, // we use context deadline instead
Delay: makeWaiterDelay(),
Acceptors: acceptors,
NewRequest: func(_ []request.Option) (*request.Request, error) {
logger.Debug(msg)
req := newRequest()
req.SetContext(ctx)
return req, nil
},
}
}

// MakeAcceptors constructs a slice of request acceptors
func MakeAcceptors(statusPath string, successStatus string, failureStates []string, extraAcceptors ...request.WaiterAcceptor) []request.WaiterAcceptor {
acceptors := []request.WaiterAcceptor{makeStatusAcceptor(successStatus, statusPath)}
acceptors[0].State = request.SuccessWaiterState

for _, s := range failureStates {
acceptors = append(acceptors, makeStatusAcceptor(s, statusPath))
}

acceptors = append(acceptors, extraAcceptors...)

return acceptors
}

func makeStatusAcceptor(status string, statusPath string) request.WaiterAcceptor {
return request.WaiterAcceptor{
Matcher: request.PathAllWaiterMatch,
Argument: statusPath,
Expected: status,
State: request.FailureWaiterState,
}
}

// makeWaiterDelay returns delay ranging between 15s and 20s
func makeWaiterDelay() request.WaiterDelay {
const (
base = 15 * time.Second
offsetSteps = 200
offsetMax = 5000
stepMult = offsetMax / offsetSteps
)

offsets := rand.Perm(offsetSteps)

return func(attempt int) time.Duration {
s := rand.Intn(offsetSteps)
d := stepMult * offsets[s]

offset := time.Duration(d) * time.Millisecond

return base + offset
}
}

0 comments on commit 7672842

Please sign in to comment.