diff --git a/cmd/main.go b/cmd/main.go index 797ef2893a..78cd00b4d9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -73,6 +73,7 @@ func main() { driver.WithUserAgentExtra(options.ControllerOptions.UserAgentExtra), driver.WithOtelTracing(options.ServerOptions.EnableOtelTracing), driver.WithBatching(options.ControllerOptions.Batching), + driver.WithModifyVolumeRequestHandlerTimeout(options.ControllerOptions.ModifyVolumeRequestHandlerTimeout), ) if err != nil { klog.ErrorS(err, "failed to create driver") diff --git a/cmd/options/controller_options.go b/cmd/options/controller_options.go index ecffd97dfe..4599d4b5f5 100644 --- a/cmd/options/controller_options.go +++ b/cmd/options/controller_options.go @@ -17,8 +17,11 @@ limitations under the License. package options import ( + "time" + flag "github.com/spf13/pflag" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver" cliflag "k8s.io/component-base/cli/flag" ) @@ -41,6 +44,9 @@ type ControllerOptions struct { UserAgentExtra string // flag to enable batching of API calls Batching bool + // flag to set the timeout for volume modification requests to be coalesced into a single + // volume modification call to AWS. + ModifyVolumeRequestHandlerTimeout time.Duration } func (s *ControllerOptions) AddFlags(fs *flag.FlagSet) { @@ -51,4 +57,5 @@ func (s *ControllerOptions) AddFlags(fs *flag.FlagSet) { fs.BoolVar(&s.WarnOnInvalidTag, "warn-on-invalid-tag", false, "To warn on invalid tags, instead of returning an error") fs.StringVar(&s.UserAgentExtra, "user-agent-extra", "", "Extra string appended to user agent.") fs.BoolVar(&s.Batching, "batching", false, "To enable batching of API calls. This is especially helpful for improving performance in workloads that are sensitive to EC2 rate limits.") + fs.DurationVar(&s.ModifyVolumeRequestHandlerTimeout, "modify-volume-request-handler-timeout", driver.DefaultModifyVolumeRequestHandlerTimeout, "Timeout for the window in which volume modification calls must be received in order for them to coalesce into a single volume modification call to AWS. This must be lower than the csi-resizer and volumemodifier timeouts") } diff --git a/cmd/options/controller_options_test.go b/cmd/options/controller_options_test.go index ce12d39995..02919db846 100644 --- a/cmd/options/controller_options_test.go +++ b/cmd/options/controller_options_test.go @@ -53,6 +53,11 @@ func TestControllerOptions(t *testing.T) { flag: "user-agent-extra", found: true, }, + { + name: "lookup modify-volume-request-handler-timeout", + flag: "modify-volume-request-handler-timeout", + found: true, + }, { name: "fail for non-desired flag", flag: "some-other-flag", diff --git a/docs/options.md b/docs/options.md index 707dc03e84..fd1ef08063 100644 --- a/docs/options.md +++ b/docs/options.md @@ -13,3 +13,4 @@ There are a couple of driver options that can be passed as arguments when starti | user-agent-extra | csi-ebs | helm | Extra string appended to user agent| | enable-otel-tracing | true | false | If set to true, the driver will enable opentelemetry tracing. Might need [additional env variables](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration) to export the traces to the right collector| | batching | true | true | If set to true, the driver will enable batching of API calls. This is especially helpful for improving performance in workloads that are sensitive to EC2 rate limits at the cost of a small increase to worst-case latency| +| modify-volume-request-handler-timeout | 10s | 2s | Timeout for the window in which volume modification calls must be received in order for them to coalesce into a single volume modification call to AWS. If changing this, be aware that the ebs-csi-controller's csi-resizer and volumemodifier containers both have timeouts on the calls they make, if this value exceeds those timeouts it will cause them to always fail and fall into a retry loop, so adjust those values accordingly. diff --git a/pkg/driver/constants.go b/pkg/driver/constants.go index 19839369a9..f34ccbc7ee 100644 --- a/pkg/driver/constants.go +++ b/pkg/driver/constants.go @@ -16,6 +16,8 @@ limitations under the License. package driver +import "time" + // constants of keys in PublishContext const ( // devicePathKey represents key for device path in PublishContext @@ -156,7 +158,8 @@ const ( // constants for default command line flag values const ( - DefaultCSIEndpoint = "unix://tmp/csi.sock" + DefaultCSIEndpoint = "unix://tmp/csi.sock" + DefaultModifyVolumeRequestHandlerTimeout = 2 * time.Second ) // constants for disk block size diff --git a/pkg/driver/controller_modify_volume.go b/pkg/driver/controller_modify_volume.go index 93ba89abe5..7f7d9082db 100644 --- a/pkg/driver/controller_modify_volume.go +++ b/pkg/driver/controller_modify_volume.go @@ -20,8 +20,6 @@ const ( ModificationKeyIOPS = "iops" ModificationKeyThroughput = "throughput" - - modifyVolumeRequestHandlerTimeout = 2 * time.Second ) type modifyVolumeRequest struct { @@ -118,7 +116,7 @@ func (d *controllerService) processModifyVolumeRequests(h *modifyVolumeRequestHa select { case req := <-h.requestChan: process(req) - case <-time.After(modifyVolumeRequestHandlerTimeout): + case <-time.After(d.driverOptions.modifyVolumeRequestHandlerTimeout): d.modifyVolumeManager.requestHandlerMap.Delete(h.volumeID) // At this point, no new requests can come in on the request channel because it has been removed from the map // However, the request channel may still have requests waiting on it diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index ca8d0440bc..fb7be4c907 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net" + "time" "github.com/awslabs/volume-modifier-for-k8s/pkg/rpc" csi "github.com/container-storage-interface/spec/lib/go/csi" @@ -64,24 +65,26 @@ type Driver struct { } type DriverOptions struct { - endpoint string - extraTags map[string]string - mode Mode - volumeAttachLimit int64 - kubernetesClusterID string - awsSdkDebugLog bool - batching bool - warnOnInvalidTag bool - userAgentExtra string - otelTracing bool + endpoint string + extraTags map[string]string + mode Mode + volumeAttachLimit int64 + kubernetesClusterID string + awsSdkDebugLog bool + batching bool + warnOnInvalidTag bool + userAgentExtra string + otelTracing bool + modifyVolumeRequestHandlerTimeout time.Duration } func NewDriver(options ...func(*DriverOptions)) (*Driver, error) { klog.InfoS("Driver Information", "Driver", DriverName, "Version", driverVersion) driverOptions := DriverOptions{ - endpoint: DefaultCSIEndpoint, - mode: AllMode, + endpoint: DefaultCSIEndpoint, + mode: AllMode, + modifyVolumeRequestHandlerTimeout: DefaultModifyVolumeRequestHandlerTimeout, } for _, option := range options { option(&driverOptions) @@ -253,3 +256,12 @@ func WithOtelTracing(enableOtelTracing bool) func(*DriverOptions) { o.otelTracing = enableOtelTracing } } + +func WithModifyVolumeRequestHandlerTimeout(timeout time.Duration) func(*DriverOptions) { + return func(o *DriverOptions) { + if timeout == 0 { + return + } + o.modifyVolumeRequestHandlerTimeout = timeout + } +} diff --git a/pkg/driver/driver_test.go b/pkg/driver/driver_test.go index efe033c63e..e0a692a9d2 100644 --- a/pkg/driver/driver_test.go +++ b/pkg/driver/driver_test.go @@ -19,6 +19,7 @@ package driver import ( "reflect" "testing" + "time" ) func TestWithEndpoint(t *testing.T) { @@ -121,3 +122,13 @@ func TestWithBatching(t *testing.T) { t.Fatalf("expected batching option got set to %v but is set to %v", batching, options.batching) } } + +func TestWithModifyVolumeRequestHandlerTimeout(t *testing.T) { + timeout := 15 * time.Second + options := &DriverOptions{} + WithModifyVolumeRequestHandlerTimeout(timeout)(options) + if options.modifyVolumeRequestHandlerTimeout != timeout { + t.Fatalf("expected modifyVolumeRequestHandlerTimeout option got set to %v but is set to %v", + timeout, options.modifyVolumeRequestHandlerTimeout) + } +} diff --git a/pkg/driver/request_coalescing_test.go b/pkg/driver/request_coalescing_test.go index ec7cfeaf2b..f91d028157 100644 --- a/pkg/driver/request_coalescing_test.go +++ b/pkg/driver/request_coalescing_test.go @@ -3,6 +3,7 @@ package driver import ( "context" "fmt" + // "errors" "sync" "testing" @@ -40,9 +41,11 @@ func TestBasicRequestCoalescingSuccess(t *testing.T) { }) awsDriver := controllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - driverOptions: &DriverOptions{}, + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{ + modifyVolumeRequestHandlerTimeout: 2 * time.Second, + }, modifyVolumeManager: newModifyVolumeManager(), } @@ -95,9 +98,11 @@ func TestRequestFail(t *testing.T) { }) awsDriver := controllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - driverOptions: &DriverOptions{}, + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{ + modifyVolumeRequestHandlerTimeout: 2 * time.Second, + }, modifyVolumeManager: newModifyVolumeManager(), } @@ -164,9 +169,11 @@ func TestPartialFail(t *testing.T) { }) awsDriver := controllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - driverOptions: &DriverOptions{}, + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{ + modifyVolumeRequestHandlerTimeout: 2 * time.Second, + }, modifyVolumeManager: newModifyVolumeManager(), } @@ -249,9 +256,11 @@ func TestSequentialRequests(t *testing.T) { }).Times(2) awsDriver := controllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - driverOptions: &DriverOptions{}, + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{ + modifyVolumeRequestHandlerTimeout: 2 * time.Second, + }, modifyVolumeManager: newModifyVolumeManager(), } @@ -307,9 +316,11 @@ func TestDuplicateRequest(t *testing.T) { }) awsDriver := controllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - driverOptions: &DriverOptions{}, + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{ + modifyVolumeRequestHandlerTimeout: 2 * time.Second, + }, modifyVolumeManager: newModifyVolumeManager(), } @@ -372,9 +383,11 @@ func TestContextTimeout(t *testing.T) { }) awsDriver := controllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - driverOptions: &DriverOptions{}, + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{ + modifyVolumeRequestHandlerTimeout: 2 * time.Second, + }, modifyVolumeManager: newModifyVolumeManager(), } @@ -438,9 +451,11 @@ func TestResponseReturnTiming(t *testing.T) { }) awsDriver := controllerService{ - cloud: mockCloud, - inFlight: internal.NewInFlight(), - driverOptions: &DriverOptions{}, + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{ + modifyVolumeRequestHandlerTimeout: 2 * time.Second, + }, modifyVolumeManager: newModifyVolumeManager(), } diff --git a/pkg/driver/validation.go b/pkg/driver/validation.go index 8ba4c7be3c..7cce5a0e6e 100644 --- a/pkg/driver/validation.go +++ b/pkg/driver/validation.go @@ -17,6 +17,7 @@ limitations under the License. package driver import ( + "errors" "fmt" "regexp" "strings" @@ -34,6 +35,10 @@ func ValidateDriverOptions(options *DriverOptions) error { return fmt.Errorf("Invalid mode: %w", err) } + if options.modifyVolumeRequestHandlerTimeout == 0 { + return errors.New("Invalid modifyVolumeRequestHandlerTimeout: Timeout cannot be zero") + } + return nil } diff --git a/pkg/driver/validation_test.go b/pkg/driver/validation_test.go index 42f85bceac..4730a42ec8 100644 --- a/pkg/driver/validation_test.go +++ b/pkg/driver/validation_test.go @@ -17,11 +17,13 @@ limitations under the License. package driver import ( + "errors" "fmt" "math/rand" "reflect" "strconv" "testing" + "time" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" ) @@ -156,20 +158,23 @@ func TestValidateMode(t *testing.T) { func TestValidateDriverOptions(t *testing.T) { testCases := []struct { - name string - mode Mode - extraVolumeTags map[string]string - expErr error + name string + mode Mode + extraVolumeTags map[string]string + modifyVolumeTimeout time.Duration + expErr error }{ { - name: "success", - mode: AllMode, - expErr: nil, + name: "success", + mode: AllMode, + modifyVolumeTimeout: 5 * time.Second, + expErr: nil, }, { - name: "fail because validateMode fails", - mode: Mode("unknown"), - expErr: fmt.Errorf("Invalid mode: %w", fmt.Errorf("Mode is not supported (actual: unknown, supported: %v)", []Mode{AllMode, ControllerMode, NodeMode})), + name: "fail because validateMode fails", + mode: Mode("unknown"), + modifyVolumeTimeout: 5 * time.Second, + expErr: fmt.Errorf("Invalid mode: %w", fmt.Errorf("Mode is not supported (actual: unknown, supported: %v)", []Mode{AllMode, ControllerMode, NodeMode})), }, { name: "fail because validateExtraVolumeTags fails", @@ -177,15 +182,23 @@ func TestValidateDriverOptions(t *testing.T) { extraVolumeTags: map[string]string{ randomString(cloud.MaxTagKeyLength + 1): "extra-tag-value", }, - expErr: fmt.Errorf("Invalid extra tags: %w", fmt.Errorf("Tag key too long (actual: %d, limit: %d)", cloud.MaxTagKeyLength+1, cloud.MaxTagKeyLength)), + modifyVolumeTimeout: 5 * time.Second, + expErr: fmt.Errorf("Invalid extra tags: %w", fmt.Errorf("Tag key too long (actual: %d, limit: %d)", cloud.MaxTagKeyLength+1, cloud.MaxTagKeyLength)), + }, + { + name: "fail because modifyVolumeRequestHandlerTimeout is zero", + mode: AllMode, + modifyVolumeTimeout: 0, + expErr: errors.New("Invalid modifyVolumeRequestHandlerTimeout: Timeout cannot be zero"), }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { err := ValidateDriverOptions(&DriverOptions{ - extraTags: tc.extraVolumeTags, - mode: tc.mode, + extraTags: tc.extraVolumeTags, + mode: tc.mode, + modifyVolumeRequestHandlerTimeout: tc.modifyVolumeTimeout, }) if !reflect.DeepEqual(err, tc.expErr) { t.Fatalf("error not equal\ngot:\n%s\nexpected:\n%s", err, tc.expErr)