Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new client token when CreateVolume returns IdempotentParameterMismatch #2075

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 52 additions & 62 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/aws/smithy-go"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/batcher"
dm "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud/devicemanager"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/expiringcache"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -91,6 +93,7 @@ var (
const (
volumeDetachedState = "detached"
volumeAttachedState = "attached"
cacheForgetDelay = 1 * time.Hour
AndrewSirenko marked this conversation as resolved.
Show resolved Hide resolved
)

// AWS provisioning limits.
Expand Down Expand Up @@ -310,12 +313,14 @@ type batcherManager struct {
}

type cloud struct {
region string
ec2 EC2API
dm dm.DeviceManager
bm *batcherManager
rm *retryManager
vwp volumeWaitParameters
region string
ec2 EC2API
dm dm.DeviceManager
bm *batcherManager
rm *retryManager
vwp volumeWaitParameters
likelyBadDeviceNames expiringcache.ExpiringCache[string, sync.Map]
latestClientTokens expiringcache.ExpiringCache[string, int]
}

var _ Cloud = &cloud{}
Expand Down Expand Up @@ -364,12 +369,14 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
}

return &cloud{
region: region,
dm: dm.NewDeviceManager(),
ec2: svc,
bm: bm,
rm: newRetryManager(),
vwp: vwp,
region: region,
dm: dm.NewDeviceManager(),
ec2: svc,
bm: bm,
rm: newRetryManager(),
vwp: vwp,
likelyBadDeviceNames: expiringcache.New[string, sync.Map](cacheForgetDelay),
latestClientTokens: expiringcache.New[string, int](cacheForgetDelay),
}
}

Expand Down Expand Up @@ -586,8 +593,22 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
}
}

// We hash the volume name to generate a unique token that is less than or equal to 64 characters
clientToken := sha256.Sum256([]byte(volumeName))
// The first client token used for any volume is the volume name as provided via CSI
// However, if a volume fails to create asyncronously (that is, the CreateVolume call
// succeeds but the volume ultimately fails to create), the client token is burned until
// EC2 forgets about its use (measured as 12 hours under normal conditions)
//
// To prevent becoming stuck for 12 hours when this occurs, we sequentially append "-2",
// "-3", "-4", etc to the volume name before hashing on the subsequent attempt after a
// volume fails to create because of an IdempotentParameterMismatch AWS error
// The most recent appended value is stored in an expiring cache to prevent memory leaks
tokenBase := volumeName
if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok {
tokenBase += "-" + strconv.Itoa(*tokenNumber)
}

// We use a sha256 hash to guarantee the token that is less than or equal to 64 characters
clientToken := sha256.Sum256([]byte(tokenBase))

requestInput := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(zone),
Expand Down Expand Up @@ -630,6 +651,11 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return nil, ErrNotFound
}
if isAWSErrorIdempotentParameterMismatch(err) {
nextTokenNumber := 2
ConnorJC3 marked this conversation as resolved.
Show resolved Hide resolved
if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok {
nextTokenNumber = *tokenNumber + 1
}
c.latestClientTokens.Set(volumeName, &nextTokenNumber)
return nil, ErrIdempotentParameterMismatch
}
return nil, fmt.Errorf("could not create volume in EC2: %w", err)
Expand Down Expand Up @@ -847,34 +873,19 @@ func (c *cloud) batchDescribeInstances(request *ec2.DescribeInstancesInput) (*ty
return r.Result, nil
}

// Node likely bad device names cache
// Remember device names that are already in use on an instance and use them last when attaching volumes
// This works around device names that are used but do not appear in the mapping from DescribeInstanceStatus
const cacheForgetDelay = 1 * time.Hour

type cachedNode struct {
timer *time.Timer
likelyBadNames map[string]struct{}
}

var cacheMutex sync.Mutex
var nodeDeviceCache map[string]cachedNode = map[string]cachedNode{}

func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string, error) {
instance, err := c.getInstance(ctx, nodeID)
if err != nil {
return "", err
}

likelyBadNames := map[string]struct{}{}
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
likelyBadNames = node.likelyBadNames
node.timer.Reset(cacheForgetDelay)
likelyBadDeviceNames, ok := c.likelyBadDeviceNames.Get(nodeID)
if !ok {
likelyBadDeviceNames = new(sync.Map)
c.likelyBadDeviceNames.Set(nodeID, likelyBadDeviceNames)
}
cacheMutex.Unlock()

device, err := c.dm.NewDevice(instance, volumeID, likelyBadNames)
device, err := c.dm.NewDevice(instance, volumeID, likelyBadDeviceNames)
if err != nil {
return "", err
}
Expand All @@ -892,37 +903,16 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string
})
if attachErr != nil {
if isAWSErrorBlockDeviceInUse(attachErr) {
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Node already had existing cached bad names, add on to the list
node.likelyBadNames[device.Path] = struct{}{}
node.timer.Reset(cacheForgetDelay)
} else {
// Node has no existing cached bad device names, setup a new struct instance
nodeDeviceCache[nodeID] = cachedNode{
timer: time.AfterFunc(cacheForgetDelay, func() {
// If this ever fires, the node has not had a volume attached for an hour
// In order to prevent a semi-permanent memory leak, delete it from the map
cacheMutex.Lock()
delete(nodeDeviceCache, nodeID)
cacheMutex.Unlock()
}),
likelyBadNames: map[string]struct{}{
device.Path: {},
},
}
}
cacheMutex.Unlock()
// If block device is "in use", that likely indicates a bad name that is in use by a block
// device that we do not know about (example: block devices attached in the AMI, which are
// not reported in DescribeInstance's block device map)
//
// Store such bad names in the "likely bad" map to be considered last in future attempts
likelyBadDeviceNames.Store(device.Path, struct{}{})
}
return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr)
}
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Remove succesfully attached devices from the "likely bad" list
delete(node.likelyBadNames, device.Path)
node.timer.Reset(cacheForgetDelay)
}
cacheMutex.Unlock()
likelyBadDeviceNames.Delete(device.Path)
klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp)
}

Expand Down
Loading
Loading