Skip to content

Commit

Permalink
Make modify volume interval configurable
Browse files Browse the repository at this point in the history
We're having problems related to race conditions when several changes
are applied to volumes. We don't want those changes to be applied
independently, so we're extracting this param to a driverOption to make it
configurable from the outside.
  • Loading branch information
adolsalamanca committed Jan 26, 2024
1 parent 1bdb713 commit 08687a8
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 39 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/generate-code-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
ref: ${{ github.event.pull_request.base.ref }}

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

Expand All @@ -39,7 +39,7 @@ jobs:
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/govulncheck.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/output-code-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
workflow_run:
workflows: [Generate Code Coverage]
types: [completed]

jobs:
output-code-coverage:
name: Output Code Coverage
Expand Down Expand Up @@ -44,7 +44,7 @@ jobs:
sourceRunId: ${{ github.event.workflow_run.id }}

- name: Set up go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: '^1.20.2'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/trivy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
branches:
- master
pull_request:

jobs:
build:
name: Build
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
branches:
- master
pull_request:

jobs:
buildx:
runs-on: ${{ matrix.os }}
Expand All @@ -21,7 +21,7 @@ jobs:
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

Expand Down
6 changes: 4 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (

flag "github.com/spf13/pflag"

logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/logs/json"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics"
logsapi "k8s.io/component-base/logs/api/v1"
json "k8s.io/component-base/logs/json"

"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -73,6 +74,7 @@ func main() {
driver.WithUserAgentExtra(options.ControllerOptions.UserAgentExtra),
driver.WithOtelTracing(options.ServerOptions.EnableOtelTracing),
driver.WithBatching(options.ControllerOptions.Batching),
driver.WithCustomModifyVolumeInterval(options.ControllerOptions.ModifyVolumeInterval),
)
if err != nil {
klog.ErrorS(err, "failed to create driver")
Expand Down
5 changes: 5 additions & 0 deletions cmd/options/controller_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package options

import (
"time"

flag "github.com/spf13/pflag"

cliflag "k8s.io/component-base/cli/flag"
Expand All @@ -41,6 +43,8 @@ type ControllerOptions struct {
UserAgentExtra string
// flag to enable batching of API calls
Batching bool
// ModifyVolumeInterval is the interval of time that the controller waits to process a volume change, default is 2s.
ModifyVolumeInterval time.Duration
}

func (s *ControllerOptions) AddFlags(fs *flag.FlagSet) {
Expand All @@ -51,4 +55,5 @@ func (s *ControllerOptions) AddFlags(fs *flag.FlagSet) {
fs.BoolVar(&s.WarnOnInvalidTag, "warn-on-invalid-tag", false, "To warn on invalid tags, instead of returning an error")
fs.StringVar(&s.UserAgentExtra, "user-agent-extra", "", "Extra string appended to user agent.")
fs.BoolVar(&s.Batching, "batching", false, "To enable batching of API calls. This is especially helpful for improving performance in workloads that are sensitive to EC2 rate limits.")
fs.DurationVar(&s.ModifyVolumeInterval, "modify-volume-interval", 2*time.Second, "is the interval of time that the controller waits to process a volume change, default is 2s. This interval might be useful to avoid race conditions of subsequent changes")
}
4 changes: 4 additions & 0 deletions pkg/driver/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package driver

import "time"

// constants of keys in PublishContext
const (
// devicePathKey represents key for device path in PublishContext
Expand Down Expand Up @@ -182,6 +184,8 @@ const (
AgentNotReadyNodeTaintKey = "ebs.csi.aws.com/agent-not-ready"
)

var DefaultModifyVolumeInterval = 2 * time.Second

type fileSystemConfig struct {
NotSupportedParams map[string]struct{}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/driver/controller_modify_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"time"

"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
)

const (
Expand Down Expand Up @@ -118,7 +119,7 @@ func (d *controllerService) processModifyVolumeRequests(h *modifyVolumeRequestHa
select {
case req := <-h.requestChan:
process(req)
case <-time.After(modifyVolumeRequestHandlerTimeout):
case <-time.After(d.driverOptions.modifyVolumeInterval):
d.modifyVolumeManager.requestHandlerMap.Delete(h.volumeID)
// At this point, no new requests can come in on the request channel because it has been removed from the map
// However, the request channel may still have requests waiting on it
Expand Down
38 changes: 24 additions & 14 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"context"
"fmt"
"net"
"time"

"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
csi "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"github.com/container-storage-interface/spec/lib/go/csi"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"k8s.io/klog/v2"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
)

// Mode is the operating mode of the CSI driver.
Expand Down Expand Up @@ -62,24 +64,26 @@ type Driver struct {
}

type DriverOptions struct {
endpoint string
extraTags map[string]string
mode Mode
volumeAttachLimit int64
kubernetesClusterID string
awsSdkDebugLog bool
batching bool
warnOnInvalidTag bool
userAgentExtra string
otelTracing bool
endpoint string
extraTags map[string]string
mode Mode
volumeAttachLimit int64
kubernetesClusterID string
awsSdkDebugLog bool
batching bool
warnOnInvalidTag bool
userAgentExtra string
otelTracing bool
modifyVolumeInterval time.Duration
}

func NewDriver(options ...func(*DriverOptions)) (*Driver, error) {
klog.InfoS("Driver Information", "Driver", DriverName, "Version", driverVersion)

driverOptions := DriverOptions{
endpoint: DefaultCSIEndpoint,
mode: AllMode,
endpoint: DefaultCSIEndpoint,
mode: AllMode,
modifyVolumeInterval: DefaultModifyVolumeInterval,
}
for _, option := range options {
option(&driverOptions)
Expand Down Expand Up @@ -167,6 +171,12 @@ func WithEndpoint(endpoint string) func(*DriverOptions) {
}
}

func WithCustomModifyVolumeInterval(interval time.Duration) func(*DriverOptions) {
return func(o *DriverOptions) {
o.modifyVolumeInterval = interval
}
}

func WithExtraTags(extraTags map[string]string) func(*DriverOptions) {
return func(o *DriverOptions) {
o.extraTags = extraTags
Expand Down
10 changes: 10 additions & 0 deletions pkg/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package driver
import (
"reflect"
"testing"
"time"
)

func TestWithEndpoint(t *testing.T) {
Expand Down Expand Up @@ -121,3 +122,12 @@ func TestWithBatching(t *testing.T) {
t.Fatalf("expected batching option got set to %v but is set to %v", batching, options.batching)
}
}

func TestWithModifyVolumeInterval(t *testing.T) {
options := &DriverOptions{}
interval := 30 * time.Second
WithCustomModifyVolumeInterval(interval)(options)
if options.modifyVolumeInterval != interval {
t.Fatalf("expected modify volume interval got set to %v but is set to %v", interval, options.modifyVolumeInterval)
}
}
36 changes: 23 additions & 13 deletions pkg/driver/request_coalescing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (

"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/klog/v2"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/internal"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"k8s.io/klog/v2"

"github.com/golang/mock/gomock"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
)

Expand All @@ -40,9 +42,11 @@ func TestBasicRequestCoalescingSuccess(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeInterval: DefaultModifyVolumeInterval,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down Expand Up @@ -95,9 +99,11 @@ func TestRequestFail(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeInterval: DefaultModifyVolumeInterval,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down Expand Up @@ -164,9 +170,11 @@ func TestPartialFail(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeInterval: DefaultModifyVolumeInterval,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down Expand Up @@ -307,9 +315,11 @@ func TestDuplicateRequest(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeInterval: DefaultModifyVolumeInterval,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down

0 comments on commit 08687a8

Please sign in to comment.