Skip to content

Commit

Permalink
Migrate likelyBadNames to ExpiringCache
Browse files Browse the repository at this point in the history
Signed-off-by: Connor Catlett <conncatl@amazon.com>
  • Loading branch information
ConnorJC3 committed Jun 25, 2024
1 parent 3bfb353 commit 0aaf246
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 93 deletions.
46 changes: 15 additions & 31 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var (
const (
volumeDetachedState = "detached"
volumeAttachedState = "attached"
cacheForgetDelay = 1 * time.Hour
)

// AWS provisioning limits.
Expand Down Expand Up @@ -316,7 +317,7 @@ type cloud struct {
bm *batcherManager
rm *retryManager
vwp volumeWaitParameters
likelyBadNames util.ExpiringCache[string, map[string]struct{}]
likelyBadNames util.ExpiringCache[string, sync.Map]
}

var _ Cloud = &cloud{}
Expand Down Expand Up @@ -371,7 +372,7 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
bm: bm,
rm: newRetryManager(),
vwp: vwp,
likelyBadNames: util.NewExpiringCache[string, map[string]struct{}](cacheForgetDelay),
likelyBadNames: util.NewExpiringCache[string, sync.Map](cacheForgetDelay),
}
}

Expand Down Expand Up @@ -855,7 +856,11 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string
return "", err
}

likelyBadNames, _ := c.likelyBadNames[nodeID]
likelyBadNames, ok := c.likelyBadNames.Get(nodeID)
if !ok {
likelyBadNames = new(sync.Map)
c.likelyBadNames.Set(nodeID, likelyBadNames)
}

device, err := c.dm.NewDevice(instance, volumeID, likelyBadNames)
if err != nil {
Expand All @@ -875,37 +880,16 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string
})
if attachErr != nil {
if isAWSErrorBlockDeviceInUse(attachErr) {
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Node already had existing cached bad names, add on to the list
node.likelyBadNames[device.Path] = struct{}{}
node.timer.Reset(cacheForgetDelay)
} else {
// Node has no existing cached bad device names, setup a new struct instance
nodeDeviceCache[nodeID] = cachedNode{
timer: time.AfterFunc(cacheForgetDelay, func() {
// If this ever fires, the node has not had a volume attached for an hour
// In order to prevent a semi-permanent memory leak, delete it from the map
cacheMutex.Lock()
delete(nodeDeviceCache, nodeID)
cacheMutex.Unlock()
}),
likelyBadNames: map[string]struct{}{
device.Path: {},
},
}
}
cacheMutex.Unlock()
// If block device is "in use", that likely indicates a bad name that is in use by a block
// device that we do not know about (example: block devices attached in the AMI, which are
// not reported in DescribeInstance's block device map)
//
// Store such bad names in the "likely bad" map to be considered last in future attempts
likelyBadNames.Store(device.Path, struct{}{})
}
return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr)
}
cacheMutex.Lock()
if node, ok := nodeDeviceCache[nodeID]; ok {
// Remove succesfully attached devices from the "likely bad" list
delete(node.likelyBadNames, device.Path)
node.timer.Reset(cacheForgetDelay)
}
cacheMutex.Unlock()
likelyBadNames.Delete(device.Path)
klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp)
}

Expand Down
68 changes: 26 additions & 42 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"context"
"errors"
"fmt"
"k8s.io/apimachinery/pkg/util/wait"
"reflect"
"strings"
"sync"
"testing"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
Expand Down Expand Up @@ -1341,14 +1342,13 @@ func TestAttachDisk(t *testing.T) {
}

testCases := []struct {
name string
volumeID string
nodeID string
nodeID2 string
path string
expErr error
mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager)
validateFunc func(t *testing.T)
name string
volumeID string
nodeID string
nodeID2 string
path string
expErr error
mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager)
}{
{
name: "success: AttachVolume normal",
Expand Down Expand Up @@ -1377,32 +1377,30 @@ func TestAttachDisk(t *testing.T) {
name: "success: AttachVolume skip likely bad name",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
nodeID2: defaultNodeID, // Induce second attach
path: "/dev/xvdab",
expErr: nil,
expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, blockDeviceInUseErr),
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) {
volumeRequest := createVolumeRequest(volumeID)
instanceRequest := createInstanceRequest(nodeID)
attachRequest := createAttachRequest(volumeID, nodeID, path)
attachRequest1 := createAttachRequest(volumeID, nodeID, defaultPath)
attachRequest2 := createAttachRequest(volumeID, nodeID, path)

gomock.InOrder(
// First call - fail with "already in use" error
mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest), gomock.Any()).Return(&ec2.AttachVolumeOutput{
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest1), gomock.Any()).Return(nil, blockDeviceInUseErr),

// Second call - succeed, expect bad device name to be skipped
mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest2), gomock.Any()).Return(&ec2.AttachVolumeOutput{
Device: aws.String(path),
InstanceId: aws.String(nodeID),
VolumeId: aws.String(volumeID),
State: types.VolumeAttachmentStateAttaching,
}, nil),
mockEC2.EXPECT().DescribeVolumes(gomock.Any(), volumeRequest).Return(createDescribeVolumesOutput([]*string{&volumeID}, nodeID, path, "attached"), nil),
)

nodeDeviceCache = map[string]cachedNode{
defaultNodeID: {
timer: time.NewTimer(1 * time.Hour),
likelyBadNames: map[string]struct{}{
defaultPath: {},
},
},
}
},
},
{
Expand All @@ -1416,7 +1414,7 @@ func TestAttachDisk(t *testing.T) {
instanceRequest := createInstanceRequest(nodeID)

fakeInstance := newFakeInstance(nodeID, volumeID, path)
_, err := dm.NewDevice(&fakeInstance, volumeID, map[string]struct{}{})
_, err := dm.NewDevice(&fakeInstance, volumeID, new(sync.Map))
require.NoError(t, err)

gomock.InOrder(
Expand All @@ -1439,9 +1437,6 @@ func TestAttachDisk(t *testing.T) {
mockEC2.EXPECT().AttachVolume(gomock.Any(), attachRequest, gomock.Any()).Return(nil, errors.New("AttachVolume error")),
)
},
validateFunc: func(t *testing.T) {
assert.NotContains(t, nodeDeviceCache, defaultNodeID)
},
},
{
name: "fail: AttachVolume returned block device already in use error",
Expand All @@ -1458,11 +1453,6 @@ func TestAttachDisk(t *testing.T) {
mockEC2.EXPECT().AttachVolume(ctx, attachRequest, gomock.Any()).Return(nil, blockDeviceInUseErr),
)
},
validateFunc: func(t *testing.T) {
assert.Contains(t, nodeDeviceCache, defaultNodeID)
assert.NotNil(t, nodeDeviceCache[defaultNodeID].timer)
assert.Contains(t, nodeDeviceCache[defaultNodeID].likelyBadNames, defaultPath)
},
},
{
name: "success: AttachVolume multi-attach",
Expand Down Expand Up @@ -1524,9 +1514,6 @@ func TestAttachDisk(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Reset node likely bad names cache
nodeDeviceCache = map[string]cachedNode{}

mockCtrl := gomock.NewController(t)
mockEC2 := NewMockEC2API(mockCtrl)
c := newCloud(mockEC2)
Expand All @@ -1552,10 +1539,6 @@ func TestAttachDisk(t *testing.T) {
assert.Equal(t, tc.path, devicePath)
}

if tc.validateFunc != nil {
tc.validateFunc(t)
}

mockCtrl.Finish()
})
}
Expand Down Expand Up @@ -3086,11 +3069,12 @@ func testVolumeWaitParameters() volumeWaitParameters {

func newCloud(mockEC2 EC2API) Cloud {
c := &cloud{
region: "test-region",
dm: dm.NewDeviceManager(),
ec2: mockEC2,
rm: newRetryManager(),
vwp: testVolumeWaitParameters(),
region: "test-region",
dm: dm.NewDeviceManager(),
ec2: mockEC2,
rm: newRetryManager(),
vwp: testVolumeWaitParameters(),
likelyBadNames: util.NewExpiringCache[string, sync.Map](cacheForgetDelay),
}
return c
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/cloud/devicemanager/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package devicemanager

import (
"fmt"
"sync"
)

// ExistingNames is a map of assigned device names. Presence of a key with a device
Expand All @@ -34,7 +35,7 @@ type ExistingNames map[string]string
// call), so all available device names are used eventually and it minimizes
// device name reuse.
type NameAllocator interface {
GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (name string, err error)
GetNext(existingNames ExistingNames, likelyBadNames *sync.Map) (name string, err error)
}

type nameAllocator struct{}
Expand All @@ -46,18 +47,27 @@ var _ NameAllocator = &nameAllocator{}
//
// likelyBadNames is a map of names that have previously returned an "in use" error when attempting to mount to them
// These names are unlikely to result in a successful mount, and may be permanently unavailable, so use them last
func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames map[string]struct{}) (string, error) {
func (d *nameAllocator) GetNext(existingNames ExistingNames, likelyBadNames *sync.Map) (string, error) {
for _, name := range deviceNames {
_, existing := existingNames[name]
_, likelyBad := likelyBadNames[name]
_, likelyBad := likelyBadNames.Load(name)
if !existing && !likelyBad {
return name, nil
}
}
for name := range likelyBadNames {
if _, existing := existingNames[name]; !existing {
return name, nil

finalResortName := ""
likelyBadNames.Range(func(name, _ interface{}) bool {
if name, ok := name.(string); ok {
if _, existing := existingNames[name]; !existing {
finalResortName = name
return true
}
}
return false
})
if finalResortName != "" {
return finalResortName, nil
}

return "", fmt.Errorf("there are no names available")
Expand Down
13 changes: 8 additions & 5 deletions pkg/cloud/devicemanager/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package devicemanager

import (
"sync"
"testing"
)

Expand All @@ -26,7 +27,7 @@ func TestNameAllocator(t *testing.T) {

for _, name := range deviceNames {
t.Run(name, func(t *testing.T) {
actual, err := allocator.GetNext(existingNames, map[string]struct{}{})
actual, err := allocator.GetNext(existingNames, new(sync.Map))
if err != nil {
t.Errorf("test %q: unexpected error: %v", name, err)
}
Expand All @@ -40,6 +41,8 @@ func TestNameAllocator(t *testing.T) {

func TestNameAllocatorLikelyBadName(t *testing.T) {
skippedName := deviceNames[32]
likelyBadNames := new(sync.Map)
likelyBadNames.Store(skippedName, struct{}{})
existingNames := map[string]string{}
allocator := nameAllocator{}

Expand All @@ -50,7 +53,7 @@ func TestNameAllocatorLikelyBadName(t *testing.T) {
}

t.Run(name, func(t *testing.T) {
actual, err := allocator.GetNext(existingNames, map[string]struct{}{skippedName: {}})
actual, err := allocator.GetNext(existingNames, likelyBadNames)
if err != nil {
t.Errorf("test %q: unexpected error: %v", name, err)
}
Expand All @@ -61,7 +64,7 @@ func TestNameAllocatorLikelyBadName(t *testing.T) {
})
}

lastName, _ := allocator.GetNext(existingNames, map[string]struct{}{skippedName: {}})
lastName, _ := allocator.GetNext(existingNames, likelyBadNames)
if lastName != skippedName {
t.Errorf("test %q: expected %q, got %q (likelyBadNames fallback)", skippedName, skippedName, lastName)
}
Expand All @@ -72,10 +75,10 @@ func TestNameAllocatorError(t *testing.T) {
existingNames := map[string]string{}

for i := 0; i < len(deviceNames); i++ {
name, _ := allocator.GetNext(existingNames, map[string]struct{}{})
name, _ := allocator.GetNext(existingNames, new(sync.Map))
existingNames[name] = ""
}
name, err := allocator.GetNext(existingNames, map[string]struct{}{})
name, err := allocator.GetNext(existingNames, new(sync.Map))
if err == nil {
t.Errorf("expected error, got device %q", name)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloud/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type DeviceManager interface {
// NewDevice retrieves the device if the device is already assigned.
// Otherwise it creates a new device with next available device name
// and mark it as unassigned device.
NewDevice(instance *types.Instance, volumeID string, likelyBadNames map[string]struct{}) (device *Device, err error)
NewDevice(instance *types.Instance, volumeID string, likelyBadNames *sync.Map) (device *Device, err error)

// GetDevice returns the device already assigned to the volume.
GetDevice(instance *types.Instance, volumeID string) (device *Device, err error)
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewDeviceManager() DeviceManager {
}
}

func (d *deviceManager) NewDevice(instance *types.Instance, volumeID string, likelyBadNames map[string]struct{}) (*Device, error) {
func (d *deviceManager) NewDevice(instance *types.Instance, volumeID string, likelyBadNames *sync.Map) (*Device, error) {
d.mux.Lock()
defer d.mux.Unlock()

Expand Down
Loading

0 comments on commit 0aaf246

Please sign in to comment.