Skip to content

Commit

Permalink
handle unpublish without an allocation ID
Browse files Browse the repository at this point in the history
The `nomad volume detach` command doesn't take an allocation ID so that the
operator doesn't have to keep track of alloc IDs that may have been
GC'd. Handle this case in the unpublish RPC by sending the client RPC for all
the terminal/nil allocs on the selected node.
  • Loading branch information
tgross committed Aug 7, 2020
1 parent 21bf6ec commit f611aae
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 27 deletions.
63 changes: 61 additions & 2 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,66 @@ RELEASE_CLAIM:
}

func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
if claim.AllocationID != "" {
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
return err
}
claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
}

// The RPC sent from the 'nomad node detach' command won't have an
// allocation ID set so we try to unpublish every terminal or invalid
// alloc on the node
allocIDs := []string{}
state := v.srv.fsm.State()
vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
if err != nil {
return err
}
for allocID, alloc := range vol.ReadAllocs {
if alloc == nil {
rclaim, ok := vol.ReadClaims[allocID]
if ok && rclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else {
if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
}
for allocID, alloc := range vol.WriteAllocs {
if alloc == nil {
wclaim, ok := vol.WriteClaims[allocID]
if ok && wclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else {
if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
}
var merr multierror.Error
for _, allocID := range allocIDs {
claim.AllocationID = allocID
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
merr.Errors = append(merr.Errors, err)
}
}
err = merr.ErrorOrNil()
if err != nil {
return err
}

claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
}

func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
req := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: vol.PluginID,
VolumeID: vol.ID,
Expand All @@ -609,8 +669,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
return fmt.Errorf("could not detach from node: %w", err)
}
}
claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
return nil
}

func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
Expand Down
77 changes: 52 additions & 25 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,62 +425,88 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {

codec := rpcClient(t, srv)

// setup: create a client node with a controller and node plugin
node := mock.Node()
node.Attributes["nomad.version"] = "0.11.0"
node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true},
RequiresControllerPlugin: true,
},
}
index++
require.NoError(t, state.UpsertNode(index, node))

type tc struct {
name string
startingState structs.CSIVolumeClaimState
hasController bool
expectedErrMsg string
}

testCases := []tc{
{
name: "no path to node plugin",
startingState: structs.CSIVolumeClaimStateTaken,
hasController: true,
expectedErrMsg: "could not detach from node: Unknown node ",
name: "success",
startingState: structs.CSIVolumeClaimStateControllerDetached,
},
{
name: "no registered controller plugin",
name: "unpublish previously detached node",
startingState: structs.CSIVolumeClaimStateNodeDetached,
hasController: true,
expectedErrMsg: "could not detach from controller: controller detach volume: plugin missing: minnie",
expectedErrMsg: "could not detach from controller: No path to node",
},
{
name: "success",
startingState: structs.CSIVolumeClaimStateControllerDetached,
hasController: true,
name: "first unpublish",
startingState: structs.CSIVolumeClaimStateTaken,
expectedErrMsg: "could not detach from node: No path to node",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

// setup: register a volume
volID := uuid.Generate()
nodeID := uuid.Generate()
allocID := uuid.Generate()

claim := &structs.CSIVolumeClaim{
AllocationID: allocID,
NodeID: nodeID,
ExternalNodeID: "i-example",
Mode: structs.CSIVolumeClaimRead,
State: tc.startingState,
}

vol := &structs.CSIVolume{
ID: volID,
Namespace: ns,
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
PluginID: "minnie",
Secrets: structs.CSISecrets{"mysecret": "secretvalue"},
ControllerRequired: tc.hasController,
ControllerRequired: true,
}

index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(t, err)

// setup: create an alloc that will claim our volume
alloc := mock.BatchAlloc()
alloc.NodeID = node.ID
alloc.ClientStatus = structs.AllocClientStatusFailed

index++
require.NoError(t, state.UpsertAllocs(index, []*structs.Allocation{alloc}))

// setup: claim the volume for our alloc
claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: node.ID,
ExternalNodeID: "i-example",
Mode: structs.CSIVolumeClaimRead,
}

index++
claim.State = structs.CSIVolumeClaimStateTaken
err = state.CSIVolumeClaim(index, ns, volID, claim)
require.NoError(t, err)

// test: unpublish and check the results
claim.State = tc.startingState
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: volID,
Claim: claim,
Expand All @@ -497,6 +523,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
if tc.expectedErrMsg == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg),
"error message %q did not contain %q", err.Error(), tc.expectedErrMsg)
}
Expand Down

0 comments on commit f611aae

Please sign in to comment.