diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 411c01bc64..c468bfa16b 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -265,11 +265,12 @@ type snapshotBatcherType int // batcherManager maintains a collection of batchers for different types of tasks. type batcherManager struct { - volumeIDBatcher *batcher.Batcher[string, *types.Volume] - volumeTagBatcher *batcher.Batcher[string, *types.Volume] - instanceIDBatcher *batcher.Batcher[string, *types.Instance] - snapshotIDBatcher *batcher.Batcher[string, *types.Snapshot] - snapshotTagBatcher *batcher.Batcher[string, *types.Snapshot] + volumeIDBatcher *batcher.Batcher[string, *types.Volume] + volumeTagBatcher *batcher.Batcher[string, *types.Volume] + instanceIDBatcher *batcher.Batcher[string, *types.Instance] + snapshotIDBatcher *batcher.Batcher[string, *types.Snapshot] + snapshotTagBatcher *batcher.Batcher[string, *types.Snapshot] + volumeModificationIDBatcher *batcher.Batcher[string, *types.VolumeModification] } type cloud struct { @@ -358,6 +359,9 @@ func newBatcherManager(svc EC2API) *batcherManager { snapshotTagBatcher: batcher.New(1000, 300*time.Millisecond, func(names []string) (map[string]*types.Snapshot, error) { return execBatchDescribeSnapshots(svc, names, snapshotTagBatcher) }), + volumeModificationIDBatcher: batcher.New(500, 300*time.Millisecond, func(names []string) (map[string]*types.VolumeModification, error) { + return execBatchDescribeVolumesModifications(svc, names) + }), } } @@ -638,6 +642,58 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone, SnapshotID: snapshotID, OutpostArn: outpostArn}, nil } +// execBatchDescribeVolumesModifications executes a batched DescribeVolumesModifications API call +func execBatchDescribeVolumesModifications(svc EC2API, input []string) (map[string]*types.VolumeModification, error) { + klog.V(7).InfoS("execBatchDescribeVolumeModifications", "volumeIds", input) + request := &ec2.DescribeVolumesModificationsInput{ + VolumeIds: input, + } + + ctx, cancel := context.WithTimeout(context.Background(), batchDescribeTimeout) + defer cancel() + + resp, err := describeVolumesModifications(ctx, svc, request) + if err != nil { + return nil, err + } + + result := make(map[string]*types.VolumeModification) + + for _, i := range resp { + volumeModification := i + result[*volumeModification.VolumeId] = &volumeModification + } + + klog.V(7).InfoS("execBatchDescribeVolumeModifications: success", "result", result) + return result, nil +} + +// batchDescribeVolumesModifications processes a DescribeVolumesModifications request by queuing the task and waiting for the result. +func (c *cloud) batchDescribeVolumesModifications(request *ec2.DescribeVolumesModificationsInput) (*types.VolumeModification, error) { + var task string + + if len(request.VolumeIds) == 1 && request.VolumeIds[0] != "" { + task = request.VolumeIds[0] + } else { + return nil, fmt.Errorf("batchDescribeVolumesModifications: invalid request, request: %v", request) + } + + ch := make(chan batcher.BatchResult[*types.VolumeModification]) + + b := c.bm.volumeModificationIDBatcher + b.AddTask(task, ch) + + r := <-ch + + if r.Err != nil { + return nil, r.Err + } + if r.Result == nil { + return nil, VolumeNotBeingModified + } + return r.Result, nil +} + // ResizeOrModifyDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit, and/or modifies an EBS // volume with the parameters in ModifyDiskOptions. // The resizing operation is performed only when newSizeBytes != 0. @@ -705,7 +761,7 @@ func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { return true, nil } -// executes a batched DescribeInstances API call +// execBatchDescribeInstances executes a batched DescribeInstances API call func execBatchDescribeInstances(svc EC2API, input []string) (map[string]*types.Instance, error) { klog.V(7).InfoS("execBatchDescribeInstances", "instanceIds", input) request := &ec2.DescribeInstancesInput{ @@ -1598,7 +1654,7 @@ func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string) } waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) { - m, err := c.getLatestVolumeModification(ctx, volumeID) + m, err := c.getLatestVolumeModification(ctx, volumeID, true) // Consider volumes that have never been modified as done if err != nil && errors.Is(err, VolumeNotBeingModified) { return true, nil @@ -1621,25 +1677,55 @@ func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string) return nil } +func describeVolumesModifications(ctx context.Context, svc EC2API, request *ec2.DescribeVolumesModificationsInput) ([]types.VolumeModification, error) { + volumeModifications := []types.VolumeModification{} + var nextToken *string + for { + response, err := svc.DescribeVolumesModifications(ctx, request) + if err != nil { + if isAWSErrorModificationNotFound(err) { + return nil, VolumeNotBeingModified + } + return nil, fmt.Errorf("error describing volume modifications: %w", err) + } + + volumeModifications = append(volumeModifications, response.VolumesModifications...) + + nextToken = response.NextToken + if aws.ToString(nextToken) == "" { + break + } + request.NextToken = nextToken + } + return volumeModifications, nil +} + // getLatestVolumeModification returns the last modification of the volume. -func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string) (*types.VolumeModification, error) { +func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string, isBatched bool) (*types.VolumeModification, error) { request := &ec2.DescribeVolumesModificationsInput{ VolumeIds: []string{volumeID}, } - mod, err := c.ec2.DescribeVolumesModifications(ctx, request) - if err != nil { - if isAWSErrorModificationNotFound(err) { + + // TODO Q: I see this as the cleanest way to NOT batch certain DVM calls. + // TODO Q Cont: Would making a separate batcher with maxEntries 1 or maxDelay 0 be cleaner? + if c.bm == nil || !isBatched { + mod, err := c.ec2.DescribeVolumesModifications(ctx, request) + if err != nil { + if isAWSErrorModificationNotFound(err) { + return nil, VolumeNotBeingModified + } + return nil, fmt.Errorf("error describing modifications in volume %q: %w", volumeID, err) + } + + volumeMods := mod.VolumesModifications + if len(volumeMods) == 0 { return nil, VolumeNotBeingModified } - return nil, fmt.Errorf("error describing modifications in volume %q: %w", volumeID, err) - } - volumeMods := mod.VolumesModifications - if len(volumeMods) == 0 { - return nil, VolumeNotBeingModified + return &volumeMods[len(volumeMods)-1], nil // TODO Q Check for nil pointer shenanigan + } else { + return c.batchDescribeVolumesModifications(request) } - - return &volumeMods[len(volumeMods)-1], nil } // randomAvailabilityZone returns a random zone from the given region @@ -1710,7 +1796,8 @@ func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSi } oldSizeGiB := *volume.Size - latestMod, err := c.getLatestVolumeModification(ctx, volumeID) + // This call must NOT be batched because a missing volume modification will return client error + latestMod, err := c.getLatestVolumeModification(ctx, volumeID, false) if err != nil && !errors.Is(err, VolumeNotBeingModified) { return true, oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err) } @@ -1733,6 +1820,7 @@ func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSi // At this point, we know we are starting a new volume modification // If we're asked to modify a volume to its current state, ignore the request and immediately return a success + // This is because as of March 2024, EC2 ModifyVolume calls that don't change any parameters still modify the volume if !needsVolumeModification(*volume, newSizeGiB, options) { klog.V(5).InfoS("[Debug] Skipping modification for volume due to matching stats", "volumeID", volumeID) // Wait for any existing modifications to prevent race conditions where DescribeVolume(s) returns the new diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 1b98045d57..a728f7570f 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -217,17 +217,15 @@ func executeDescribeVolumesTest(t *testing.T, c *cloud, volumeIDs, volumeNames [ wg.Add(1) r[i] = make(chan *types.Volume, 1) e[i] = make(chan error, 1) - go func(req *ec2.DescribeVolumesInput, resultCh chan *types.Volume, errCh chan error) { + go func(resultCh chan *types.Volume, errCh chan error) { defer wg.Done() - volume, err := c.batchDescribeVolumes(req) + volume, err := c.batchDescribeVolumes(request) if err != nil { errCh <- err return } resultCh <- volume - // passing `request` as a parameter to create a copy - // TODO remove after govet stops complaining about https://github.com/golang/go/discussions/56010 - }(request, r[i], e[i]) + }(r[i], e[i]) } wg.Wait() @@ -323,17 +321,15 @@ func executeDescribeInstancesTest(t *testing.T, c *cloud, instanceIds []string, r[i] = make(chan types.Instance, 1) e[i] = make(chan error, 1) - go func(req *ec2.DescribeInstancesInput, resultCh chan types.Instance, errCh chan error) { + go func(resultCh chan types.Instance, errCh chan error) { defer wg.Done() - instance, err := c.batchDescribeInstances(req) + instance, err := c.batchDescribeInstances(request) if err != nil { errCh <- err return } resultCh <- *instance - // passing `request` as a parameter to create a copy - // TODO remove after govet stops complaining about https://github.com/golang/go/discussions/56010 - }(request, r[i], e[i]) + }(r[i], e[i]) } wg.Wait() @@ -499,17 +495,15 @@ func executeDescribeSnapshotsTest(t *testing.T, c *cloud, snapshotIDs, snapshotN r[i] = make(chan *types.Snapshot, 1) e[i] = make(chan error, 1) - go func(req *ec2.DescribeSnapshotsInput, resultCh chan *types.Snapshot, errCh chan error) { + go func(resultCh chan *types.Snapshot, errCh chan error) { defer wg.Done() - snapshot, err := c.batchDescribeSnapshots(req) + snapshot, err := c.batchDescribeSnapshots(request) if err != nil { errCh <- err return } resultCh <- snapshot - // passing `request` as a parameter to create a copy - // TODO remove after govet stops complaining about https://github.com/golang/go/discussions/56010 - }(request, r[i], e[i]) + }(r[i], e[i]) } wg.Wait() @@ -533,6 +527,108 @@ func executeDescribeSnapshotsTest(t *testing.T, c *cloud, snapshotIDs, snapshotN } } +func TestBatchDescribeVolumesModifications(t *testing.T) { + testCases := []struct { + name string + volumeIds []string + mockFunc func(mockEC2 *MockEC2API, expErr error, volumeModifications []types.VolumeModification) + expErr error + }{ + { + name: "success: volumeModification by ID", + volumeIds: []string{"vol-001", "vol-002", "vol-003"}, + mockFunc: func(mockEC2 *MockEC2API, expErr error, volumeModifications []types.VolumeModification) { + volumeModificationsOutput := &ec2.DescribeVolumesModificationsOutput{VolumesModifications: volumeModifications} + mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any()).Return(volumeModificationsOutput, expErr).Times(1) + }, + expErr: nil, + }, + { + name: "fail: EC2 API generic error", + volumeIds: []string{"vol-001", "vol-002", "vol-003"}, + mockFunc: func(mockEC2 *MockEC2API, expErr error, volumeModifications []types.VolumeModification) { + mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any()).Return(nil, expErr).Times(1) + }, + expErr: fmt.Errorf("generic EC2 API error"), + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockEC2 := NewMockEC2API(mockCtrl) + c := newCloud(mockEC2) + cloudInstance := c.(*cloud) + cloudInstance.bm = newBatcherManager(cloudInstance.ec2) + + // Setup mocks + var volumeModifications []types.VolumeModification + for _, volumeId := range tc.volumeIds { + volumeModifications = append(volumeModifications, types.VolumeModification{VolumeId: aws.String(volumeId)}) + } + tc.mockFunc(mockEC2, tc.expErr, volumeModifications) + + executeDescribeVolumesModificationsTest(t, cloudInstance, tc.volumeIds, tc.expErr) + }) + } +} + +func executeDescribeVolumesModificationsTest(t *testing.T, c *cloud, volumeIds []string, expErr error) { + var wg sync.WaitGroup + + getRequestForID := func(id string) *ec2.DescribeVolumesModificationsInput { + return &ec2.DescribeVolumesModificationsInput{VolumeIds: []string{id}} + } + + requests := make([]*ec2.DescribeVolumesModificationsInput, 0, len(volumeIds)) + for _, volumeId := range volumeIds { + requests = append(requests, getRequestForID(volumeId)) + } + + r := make([]chan types.VolumeModification, len(requests)) + e := make([]chan error, len(requests)) + + for i, request := range requests { + wg.Add(1) + r[i] = make(chan types.VolumeModification, 1) + e[i] = make(chan error, 1) + + go func(resultCh chan types.VolumeModification, errCh chan error) { + defer wg.Done() + volumeModification, err := c.batchDescribeVolumesModifications(request) + if err != nil { + errCh <- err + return + } + resultCh <- *volumeModification + }(r[i], e[i]) + } + + wg.Wait() + + for i := range requests { + select { + case result := <-r[i]: + if &result == (&types.VolumeModification{}) { + t.Errorf("Received nil result for a request") + } + case err := <-e[i]: + if expErr == nil { + t.Errorf("Error while processing request: %v", err) + } + if !errors.Is(err, expErr) { + t.Errorf("Expected error %v, but got %v", expErr, err) + } + default: + t.Errorf("Did not receive a result or an error for a request") + } + } +} + func TestCreateDisk(t *testing.T) { testCases := []struct { name string