From f611aaebdb4a1cdb048daf104497bee94809deb6 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 7 Aug 2020 10:42:08 -0400 Subject: [PATCH] handle unpublish without an allocation ID The `nomad volume detach` command doesn't take an allocation ID so that the operator doesn't have to keep track of alloc IDs that may have been GC'd. Handle this case in the unpublish RPC by sending the client RPC for all the terminal/nil allocs on the selected node. --- nomad/csi_endpoint.go | 63 ++++++++++++++++++++++++++++++- nomad/csi_endpoint_test.go | 77 +++++++++++++++++++++++++------------- 2 files changed, 113 insertions(+), 27 deletions(-) diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 290574cd9f18..ed95e994f300 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -589,6 +589,66 @@ RELEASE_CLAIM: } func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { + if claim.AllocationID != "" { + err := v.nodeUnpublishVolumeImpl(vol, claim) + if err != nil { + return err + } + claim.State = structs.CSIVolumeClaimStateNodeDetached + return v.checkpointClaim(vol, claim) + } + + // The RPC sent from the 'nomad node detach' command won't have an + // allocation ID set so we try to unpublish every terminal or invalid + // alloc on the node + allocIDs := []string{} + state := v.srv.fsm.State() + vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol) + if err != nil { + return err + } + for allocID, alloc := range vol.ReadAllocs { + if alloc == nil { + rclaim, ok := vol.ReadClaims[allocID] + if ok && rclaim.NodeID == claim.NodeID { + allocIDs = append(allocIDs, allocID) + } + } else { + if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() { + allocIDs = append(allocIDs, allocID) + } + } + } + for allocID, alloc := range vol.WriteAllocs { + if alloc == nil { + wclaim, ok := vol.WriteClaims[allocID] + if ok && wclaim.NodeID == claim.NodeID { + allocIDs = append(allocIDs, allocID) + } + } else { + if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() { + allocIDs = append(allocIDs, allocID) + } + } + } + var merr multierror.Error + for _, allocID := range allocIDs { + claim.AllocationID = allocID + err := v.nodeUnpublishVolumeImpl(vol, claim) + if err != nil { + merr.Errors = append(merr.Errors, err) + } + } + err = merr.ErrorOrNil() + if err != nil { + return err + } + + claim.State = structs.CSIVolumeClaimStateNodeDetached + return v.checkpointClaim(vol, claim) +} + +func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { req := &cstructs.ClientCSINodeDetachVolumeRequest{ PluginID: vol.PluginID, VolumeID: vol.ID, @@ -609,8 +669,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C return fmt.Errorf("could not detach from node: %w", err) } } - claim.State = structs.CSIVolumeClaimStateNodeDetached - return v.checkpointClaim(vol, claim) + return nil } func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index d372e080df22..900ca1be3505 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -425,48 +425,51 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { codec := rpcClient(t, srv) + // setup: create a client node with a controller and node plugin + node := mock.Node() + node.Attributes["nomad.version"] = "0.11.0" + node.CSINodePlugins = map[string]*structs.CSIInfo{ + "minnie": {PluginID: "minnie", + Healthy: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + node.CSIControllerPlugins = map[string]*structs.CSIInfo{ + "minnie": {PluginID: "minnie", + Healthy: true, + ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true}, + RequiresControllerPlugin: true, + }, + } + index++ + require.NoError(t, state.UpsertNode(index, node)) + type tc struct { name string startingState structs.CSIVolumeClaimState - hasController bool expectedErrMsg string } - testCases := []tc{ { - name: "no path to node plugin", - startingState: structs.CSIVolumeClaimStateTaken, - hasController: true, - expectedErrMsg: "could not detach from node: Unknown node ", + name: "success", + startingState: structs.CSIVolumeClaimStateControllerDetached, }, { - name: "no registered controller plugin", + name: "unpublish previously detached node", startingState: structs.CSIVolumeClaimStateNodeDetached, - hasController: true, - expectedErrMsg: "could not detach from controller: controller detach volume: plugin missing: minnie", + expectedErrMsg: "could not detach from controller: No path to node", }, { - name: "success", - startingState: structs.CSIVolumeClaimStateControllerDetached, - hasController: true, + name: "first unpublish", + startingState: structs.CSIVolumeClaimStateTaken, + expectedErrMsg: "could not detach from node: No path to node", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - + // setup: register a volume volID := uuid.Generate() - nodeID := uuid.Generate() - allocID := uuid.Generate() - - claim := &structs.CSIVolumeClaim{ - AllocationID: allocID, - NodeID: nodeID, - ExternalNodeID: "i-example", - Mode: structs.CSIVolumeClaimRead, - State: tc.startingState, - } - vol := &structs.CSIVolume{ ID: volID, Namespace: ns, @@ -474,13 +477,36 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, PluginID: "minnie", Secrets: structs.CSISecrets{"mysecret": "secretvalue"}, - ControllerRequired: tc.hasController, + ControllerRequired: true, } index++ err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) require.NoError(t, err) + // setup: create an alloc that will claim our volume + alloc := mock.BatchAlloc() + alloc.NodeID = node.ID + alloc.ClientStatus = structs.AllocClientStatusFailed + + index++ + require.NoError(t, state.UpsertAllocs(index, []*structs.Allocation{alloc})) + + // setup: claim the volume for our alloc + claim := &structs.CSIVolumeClaim{ + AllocationID: alloc.ID, + NodeID: node.ID, + ExternalNodeID: "i-example", + Mode: structs.CSIVolumeClaimRead, + } + + index++ + claim.State = structs.CSIVolumeClaimStateTaken + err = state.CSIVolumeClaim(index, ns, volID, claim) + require.NoError(t, err) + + // test: unpublish and check the results + claim.State = tc.startingState req := &structs.CSIVolumeUnpublishRequest{ VolumeID: volID, Claim: claim, @@ -497,6 +523,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { if tc.expectedErrMsg == "" { require.NoError(t, err) } else { + require.Error(t, err) require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg), "error message %q did not contain %q", err.Error(), tc.expectedErrMsg) }