Skip to content

Commit

Permalink
Merge pull request #325 from jacobwolfaws/master
Browse files Browse the repository at this point in the history
Add inflight check to node operations
  • Loading branch information
k8s-ci-robot committed Jun 6, 2023
2 parents 0cfa179 + 34abb4a commit 9523eb1
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
3 changes: 2 additions & 1 deletion hack/kops-patch-node.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
spec:
image: 137112412989/amzn2-ami-hvm-2.0.20210525.0-x86_64-gp2
image: 137112412989/amzn2-ami-hvm-2.0.20220218.3-x86_64-gp2

19 changes: 19 additions & 0 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ var (
nodeCaps = []csi.NodeServiceCapability_RPC_Type{}
)

// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID
const VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress"

type nodeService struct {
metadata cloud.MetadataService
mounter Mounter
Expand Down Expand Up @@ -117,6 +120,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).InfoS("NodePublishVolume: volume operation finished", "volumeId", volumeID)
d.inFlight.Delete(volumeID)
}()

mountOptions := []string{}
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
Expand Down Expand Up @@ -171,6 +182,14 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).InfoS("NodeUnpublishVolume: volume operation finished", "volumeId", volumeID)
d.inFlight.Delete(volumeID)
}()

// Check if the target is mounted before unmounting
notMnt, _ := d.mounter.IsLikelyNotMountPoint(target)
if notMnt {
Expand Down
76 changes: 76 additions & 0 deletions pkg/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package driver
import (
"context"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"reflect"
Expand All @@ -31,6 +33,10 @@ import (
driverMocks "sigs.k8s.io/aws-fsx-csi-driver/pkg/driver/mocks"
)

var (
volumeID = "voltest"
)

func TestNodePublishVolume(t *testing.T) {

var (
Expand Down Expand Up @@ -431,6 +437,36 @@ func TestNodePublishVolume(t *testing.T) {
mockCtl.Finish()
},
},
{
name: "fail another operation in-flight on given volumeId",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockMetadata := cloudMock.NewMockMetadataService(mockCtl)
mockMounter := driverMocks.NewMockMounter(mockCtl)

awsDriver := &nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}

req := &csi.NodePublishVolumeRequest{
VolumeId: volumeID,
VolumeContext: map[string]string{
volumeContextDnsName: dnsname,
volumeContextMountName: mountname,
},
VolumeCapability: stdVolCap,
TargetPath: targetPath,
}

awsDriver.inFlight.Insert(volumeID)
_, err := awsDriver.NodePublishVolume(context.TODO(), req)
expectErr(t, err, codes.Aborted)
},
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -564,6 +600,31 @@ func TestNodeUnpublishVolume(t *testing.T) {
}
},
},
{
name: "fail another operation in-flight on given volumeId",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()

mockMetadata := cloudMock.NewMockMetadataService(mockCtl)
mockMounter := driverMocks.NewMockMounter(mockCtl)

awsDriver := &nodeService{
metadata: mockMetadata,
mounter: mockMounter,
inFlight: internal.NewInFlight(),
}

req := &csi.NodeUnpublishVolumeRequest{
VolumeId: volumeID,
TargetPath: targetPath,
}

awsDriver.inFlight.Insert(volumeID)
_, err := awsDriver.NodeUnpublishVolume(context.TODO(), req)
expectErr(t, err, codes.Aborted)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, tc.testFunc)
Expand Down Expand Up @@ -692,3 +753,18 @@ func getNodeMock(mockCtl *gomock.Controller, nodeName string, returnNode *corev1

return mockClient, mockNode
}

func expectErr(t *testing.T, actualErr error, expectedCode codes.Code) {
if actualErr == nil {
t.Fatalf("Expect error but got no error")
}

status, ok := status.FromError(actualErr)
if !ok {
t.Fatalf("Failed to get error status code from error: %v", actualErr)
}

if status.Code() != expectedCode {
t.Fatalf("Expected error code %d, got %d message %s", expectedCode, status.Code(), status.Message())
}
}

0 comments on commit 9523eb1

Please sign in to comment.