Skip to content

Commit

Permalink
Use new client token when CreateVolume returns IdempotentParameterMis…
Browse files Browse the repository at this point in the history
…match

Signed-off-by: Connor Catlett <conncatl@amazon.com>
  • Loading branch information
ConnorJC3 committed Jul 2, 2024
1 parent ef47e10 commit 1395040
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 3 deletions.
26 changes: 24 additions & 2 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -319,6 +320,7 @@ type cloud struct {
rm *retryManager
vwp volumeWaitParameters
likelyBadDeviceNames expiringcache.ExpiringCache[string, sync.Map]
latestClientTokens expiringcache.ExpiringCache[string, int]
}

var _ Cloud = &cloud{}
Expand Down Expand Up @@ -374,6 +376,7 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
rm: newRetryManager(),
vwp: vwp,
likelyBadDeviceNames: expiringcache.New[string, sync.Map](cacheForgetDelay),
latestClientTokens: expiringcache.New[string, int](cacheForgetDelay),
}
}

Expand Down Expand Up @@ -590,8 +593,22 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
}
}

// We hash the volume name to generate a unique token that is less than or equal to 64 characters
clientToken := sha256.Sum256([]byte(volumeName))
// The first client token used for any volume is the volume name as provided via CSI
// However, if a volume fails to create asyncronously (that is, the CreateVolume call
// succeeds but the volume ultimately fails to create), the client token is burned until
// EC2 forgets about its use (measured as 12 hours under normal conditions)
//
// To prevent becoming stuck for 12 hours when this occurs, we sequentially append "-2",
// "-3", "-4", etc to the volume name before hashing on the subsequent attempt after a
// volume fails to create because of an IdempotentParameterMismatch AWS error
// The most recent appended value is stored in an expiring cache to prevent memory leaks
tokenBase := volumeName
if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok {
tokenBase += "-" + strconv.Itoa(*tokenNumber)
}

// We use a sha256 hash to guarantee the token that is less than or equal to 64 characters
clientToken := sha256.Sum256([]byte(tokenBase))

requestInput := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(zone),
Expand Down Expand Up @@ -634,6 +651,11 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return nil, ErrNotFound
}
if isAWSErrorIdempotentParameterMismatch(err) {
nextTokenNumber := 2
if tokenNumber, ok := c.latestClientTokens.Get(volumeName); ok {
nextTokenNumber = *tokenNumber + 1
}
c.latestClientTokens.Set(volumeName, &nextTokenNumber)
return nil, ErrIdempotentParameterMismatch
}
return nil, fmt.Errorf("could not create volume in EC2: %w", err)
Expand Down
67 changes: 67 additions & 0 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,72 @@ func TestCreateDisk(t *testing.T) {
}
}

// Test client error IdempotentParameterMismatch by forcing it to progress twice
func TestCreateDiskClientToken(t *testing.T) {
t.Parallel()

const volumeName = "test-vol-client-token"
const volumeId = "vol-abcd1234"
diskOptions := &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: volumeName, AwsEbsDriverTagKey: "true"},
AvailabilityZone: defaultZone,
}

// Hash of "test-vol-client-token"
const expectedClientToken1 = "6a1b29bd7c5c5541d9d6baa2938e954fc5739dc77e97facf23590bd13f8582c2"
// Hash of "test-vol-client-token-2"
const expectedClientToken2 = "21465f5586388bb8804d0cec2df13c00f9a975c8cddec4bc35e964cdce59015b"
// Hash of "test-vol-client-token-3"
const expectedClientToken3 = "1bee5a79d83981c0041df2c414bb02e0c10aeb49343b63f50f71470edbaa736b"

mockCtrl := gomock.NewController(t)
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)

gomock.InOrder(
mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, input *ec2.CreateVolumeInput, _ ...func(*ec2.Options)) (*ec2.CreateVolumeOutput, error) {
assert.Equal(t, expectedClientToken1, *input.ClientToken)
return nil, &smithy.GenericAPIError{Code: "IdempotentParameterMismatch"}
}),
mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, input *ec2.CreateVolumeInput, _ ...func(*ec2.Options)) (*ec2.CreateVolumeOutput, error) {
assert.Equal(t, expectedClientToken2, *input.ClientToken)
return nil, &smithy.GenericAPIError{Code: "IdempotentParameterMismatch"}
}),
mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, input *ec2.CreateVolumeInput, _ ...func(*ec2.Options)) (*ec2.CreateVolumeOutput, error) {
assert.Equal(t, expectedClientToken3, *input.ClientToken)
return &ec2.CreateVolumeOutput{
VolumeId: aws.String(volumeId),
Size: aws.Int32(util.BytesToGiB(diskOptions.CapacityBytes)),
}, nil
}),
mockEC2.EXPECT().DescribeVolumes(gomock.Any(), gomock.Any()).Return(&ec2.DescribeVolumesOutput{
Volumes: []types.Volume{
{
VolumeId: aws.String(volumeId),
Size: aws.Int32(util.BytesToGiB(diskOptions.CapacityBytes)),
State: types.VolumeState("available"),
AvailabilityZone: aws.String(diskOptions.AvailabilityZone),
},
},
}, nil).AnyTimes(),
)

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(defaultCreateDiskDeadline))
defer cancel()
for i := range 3 {
_, err := c.CreateDisk(ctx, volumeName, diskOptions)
if i < 2 {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}
}

func TestDeleteDisk(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -3076,6 +3142,7 @@ func newCloud(mockEC2 EC2API) Cloud {
rm: newRetryManager(),
vwp: testVolumeWaitParameters(),
likelyBadDeviceNames: expiringcache.New[string, sync.Map](cacheForgetDelay),
latestClientTokens: expiringcache.New[string, int](cacheForgetDelay),
}
return c
}
Expand Down
1 change: 0 additions & 1 deletion pkg/cloud/devicemanager/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames *syn
finalResortName := ""
likelyBadNames.Range(func(name, _ interface{}) bool {
if name, ok := name.(string); ok {
fmt.Println(name)
if _, existing := existingNames[name]; !existing {
finalResortName = name
return false
Expand Down
7 changes: 7 additions & 0 deletions pkg/cloud/devicemanager/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func TestNameAllocatorLikelyBadName(t *testing.T) {
})
}

onlyExisting := new(sync.Map)
onlyExisting.Store(skippedNameExisting, struct{}{})
_, err := allocator.GetNext(existingNames, onlyExisting)
if err != nil {
t.Errorf("got nil when error expected (likelyBadNames with only existing names)")
}

lastName, _ := allocator.GetNext(existingNames, likelyBadNames)
if lastName != skippedNameNew {
t.Errorf("test %q: expected %q, got %q (likelyBadNames fallback)", skippedNameNew, skippedNameNew, lastName)
Expand Down

0 comments on commit 1395040

Please sign in to comment.