diff --git a/.changelog/14675.txt b/.changelog/14675.txt new file mode 100644 index 000000000000..2efa8ce2bb18 --- /dev/null +++ b/.changelog/14675.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where a volume that was successfully unmounted by the client but then failed controller unpublishing would not be marked free until garbage collection ran. +``` diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 1ec20f538d86..8ca4e7a2ef66 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2459,7 +2459,7 @@ func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) { } } return true - }, time.Second*1, 10*time.Millisecond, "invalid claims should be marked for GC") + }, time.Second*5, 10*time.Millisecond, "invalid claims should be marked for GC") } diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index cdf3d4cd395d..d3cd71cdab60 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -574,6 +574,16 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st claim := args.Claim + // we need to checkpoint when we first get the claim to ensure we've set the + // initial "past claim" state, otherwise a client that unpublishes (skipping + // the node unpublish b/c it's done that work) fail to get written if the + // controller unpublish fails. + vol = vol.Copy() + err = v.checkpointClaim(vol, claim) + if err != nil { + return err + } + // previous checkpoints may have set the past claim state already. // in practice we should never see CSIVolumeClaimStateControllerDetached // but having an option for the state makes it easy to add a checkpoint @@ -619,6 +629,13 @@ RELEASE_CLAIM: // problems. This function should only be called on a copy of the volume. func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { v.logger.Trace("node unpublish", "vol", vol.ID) + + // We need a new snapshot after each checkpoint + snap, err := v.srv.fsm.State().Snapshot() + if err != nil { + return err + } + if claim.AllocationID != "" { err := v.nodeUnpublishVolumeImpl(vol, claim) if err != nil { @@ -631,8 +648,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C // The RPC sent from the 'nomad node detach' command or GC won't have an // allocation ID set so we try to unpublish every terminal or invalid // alloc on the node, all of which will be in PastClaims after denormalizing - state := v.srv.fsm.State() - vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol) + vol, err = snap.CSIVolumeDenormalize(memdb.NewWatchSet(), vol) if err != nil { return err } @@ -702,10 +718,15 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str return nil } - state := v.srv.fsm.State() + // We need a new snapshot after each checkpoint + snap, err := v.srv.fsm.State().Snapshot() + if err != nil { + return err + } + ws := memdb.NewWatchSet() - plugin, err := state.CSIPluginByID(ws, vol.PluginID) + plugin, err := snap.CSIPluginByID(ws, vol.PluginID) if err != nil { return fmt.Errorf("could not query plugin: %v", err) } else if plugin == nil { @@ -717,7 +738,7 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str return nil } - vol, err = state.CSIVolumeDenormalize(ws, vol) + vol, err = snap.CSIVolumeDenormalize(ws, vol) if err != nil { return err } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index b8c54d3855ef..df5c66b3eecc 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -7,6 +7,7 @@ import ( "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -568,8 +569,8 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { // setup: create an alloc that will claim our volume alloc := mock.BatchAlloc() - alloc.NodeID = node.ID - alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.NodeID = tc.nodeID + alloc.ClientStatus = structs.AllocClientStatusRunning otherAlloc := mock.BatchAlloc() otherAlloc.NodeID = tc.otherNodeID @@ -579,7 +580,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc, otherAlloc})) - // setup: claim the volume for our alloc + // setup: claim the volume for our to-be-failed alloc claim := &structs.CSIVolumeClaim{ AllocationID: alloc.ID, NodeID: node.ID, @@ -617,12 +618,21 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { }, } + alloc = alloc.Copy() + alloc.ClientStatus = structs.AllocClientStatusFailed + index++ + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index, + []*structs.Allocation{alloc})) + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Unpublish", req, &structs.CSIVolumeUnpublishResponse{}) - vol, volErr := state.CSIVolumeByID(nil, ns, volID) - require.NoError(t, volErr) - require.NotNil(t, vol) + snap, snapErr := state.Snapshot() + must.NoError(t, snapErr) + + vol, volErr := snap.CSIVolumeByID(nil, ns, volID) + must.NoError(t, volErr) + must.NotNil(t, vol) if tc.expectedErrMsg == "" { require.NoError(t, err)