Skip to content

Commit

Permalink
csi: controller unpublish should check current alloc count
Browse files Browse the repository at this point in the history
Using the count of node claims from earlier in the `CSIVolume.Unpublish RPC
doesn't correctly account for cases where the RPC was interrupted but
checkpointed. Instead, we'll check the current allocation count and status to
determine whether we need to send a controller unpublish.
  • Loading branch information
tgross committed Aug 6, 2020
1 parent 416d2ee commit 1a111ab
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,20 +552,6 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st
claim := args.Claim
claim.Mode = structs.CSIVolumeClaimRelease

// we send a controller detach if a Nomad client no longer has
// any claim to the volume, so track the counts here
var nodeClaims int
for _, alloc := range vol.ReadAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID {
nodeClaims++
}
}
for _, alloc := range vol.WriteAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID {
nodeClaims++
}
}

// previous checkpoints may have set the past claim state already.
// in practice we should never see CSIVolumeClaimStateControllerDetached
// but having an option for the state makes it easy to add a checkpoint
Expand All @@ -584,8 +570,7 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st
}

NODE_DETACHED:
nodeClaims--
err = v.controllerUnpublishVolume(vol, claim, nodeClaims)
err = v.controllerUnpublishVolume(vol, claim)
if err != nil {
return err
}
Expand Down Expand Up @@ -630,15 +615,35 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
return v.checkpointClaim(vol, claim)
}

func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim, nodeClaims int) error {
func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {

// we can drop the claim without sending the controller detach if
// another node has a claim on the volume
if !vol.ControllerRequired || nodeClaims >= 1 {
if !vol.ControllerRequired {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil
}

// we only send a controller detach if a Nomad client no longer has
// any claim to the volume, so we need to check the status of claimed
// allocations
ws := memdb.NewWatchSet()
state := v.srv.fsm.State()
vol, err := state.CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}
for _, alloc := range vol.ReadAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID && !alloc.TerminalStatus() {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil
}
}
for _, alloc := range vol.WriteAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID && !alloc.TerminalStatus() {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil
}
}

// if the RPC is sent by a client node, it doesn't know the claim's
// external node ID.
if claim.ExternalNodeID == "" {
Expand All @@ -655,7 +660,7 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
Secrets: vol.Secrets,
}
req.PluginID = vol.PluginID
err := v.srv.RPC("ClientCSI.ControllerDetachVolume", req,
err = v.srv.RPC("ClientCSI.ControllerDetachVolume", req,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
if err != nil {
// TODO(tgross): need to capture case where ControllerUnpublish previously
Expand Down

0 comments on commit 1a111ab

Please sign in to comment.