From fa77f10d57c6361abf223cc6ea892d839d6ac41b Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 25 Mar 2022 20:49:30 +0000 Subject: [PATCH 1/2] backport of commit 91ead5d3b7845b3933c9ef5bb689491df93820c4 --- client/allocrunner/csi_hook.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 6e4117250dcd..0f4897f7ded9 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -272,7 +272,7 @@ func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.C } } c.logger.Debug( - "volume could not be claimed because it is in use, retrying in %v", backoff) + "volume could not be claimed because it is in use", "retry_in", backoff) t.Reset(backoff) } return &resp, err @@ -377,8 +377,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { backoff = c.maxBackoffInterval } } - c.logger.Debug( - "volume could not be unmounted, retrying in %v", backoff) + c.logger.Debug("volume could not be unmounted", "retry_in", backoff) t.Reset(backoff) } return nil From 8a24bc540595ab3b3d071f5ac33e497da1948bfa Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 25 Mar 2022 20:50:08 +0000 Subject: [PATCH 2/2] backport of commit 2639770c1ec409233ab2224954c0cddc0d77db02 --- nomad/core_sched_test.go | 20 ++++++++------------ nomad/csi_endpoint.go | 34 +++++++++++++++++++++------------- nomad/csi_endpoint_test.go | 8 ++++---- 3 files changed, 33 insertions(+), 29 deletions(-) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 095975a31f66..e5acb579aff1 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2383,21 +2383,17 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { c := core.(*CoreScheduler) require.NoError(c.csiVolumeClaimGC(gc)) - // TODO(tgross): the condition below means this test doesn't tell - // us much; ideally we should be intercepting the claim request - // and verifying that we send the expected claims but we don't - // have test infra in place to do that for server RPCs - // sending the GC claim will trigger the volumewatcher's normal - // code path. but the volumewatcher will hit an error here - // because there's no path to the node, so we shouldn't see - // the WriteClaims removed + // code path. the volumewatcher will hit an error here because + // there's no path to the node, but this is a node-only plugin so + // we accept that the node has been GC'd and there's no point + // holding onto the claim require.Eventually(func() bool { vol, _ := state.CSIVolumeByID(ws, ns, volID) - return len(vol.WriteClaims) == 1 && - len(vol.WriteAllocs) == 1 && - len(vol.PastClaims) == 1 - }, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly") + return len(vol.WriteClaims) == 0 && + len(vol.WriteAllocs) == 0 && + len(vol.PastClaims) == 0 + }, time.Second*2, 10*time.Millisecond, "claims were not released") } diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 2d1e8caf46d3..6958a3ef9f94 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -397,15 +397,6 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS args.NodeID = alloc.NodeID } - if isNewClaim { - // if this is a new claim, add a Volume and PublishContext from the - // controller (if any) to the reply - err = v.controllerPublishVolume(args, reply) - if err != nil { - return fmt.Errorf("controller publish: %v", err) - } - } - resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args) if err != nil { v.logger.Error("csi raft apply failed", "error", err, "method", "claim") @@ -415,6 +406,15 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS return respErr } + if isNewClaim { + // if this is a new claim, add a Volume and PublishContext from the + // controller (if any) to the reply + err = v.controllerPublishVolume(args, reply) + if err != nil { + return fmt.Errorf("controller publish: %v", err) + } + } + reply.Index = index v.srv.setQueryMeta(&reply.QueryMeta) return nil @@ -495,7 +495,10 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, err = v.srv.RPC(method, cReq, cResp) if err != nil { - return fmt.Errorf("attach volume: %v", err) + if strings.Contains(err.Error(), "FailedPrecondition") { + return fmt.Errorf("%v: %v", structs.ErrCSIClientRPCRetryable, err) + } + return err } resp.PublishContext = cResp.PublishContext return nil @@ -650,6 +653,11 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C } func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { + if claim.AccessMode == structs.CSIVolumeAccessModeUnknown { + // claim has already been released client-side + return nil + } + req := &cstructs.ClientCSINodeDetachVolumeRequest{ PluginID: vol.PluginID, VolumeID: vol.ID, @@ -745,17 +753,17 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str // and GC'd by this point, so looking there is the last resort. func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) (string, error) { for _, rClaim := range vol.ReadClaims { - if rClaim.NodeID == claim.NodeID { + if rClaim.NodeID == claim.NodeID && rClaim.ExternalNodeID != "" { return rClaim.ExternalNodeID, nil } } for _, wClaim := range vol.WriteClaims { - if wClaim.NodeID == claim.NodeID { + if wClaim.NodeID == claim.NodeID && wClaim.ExternalNodeID != "" { return wClaim.ExternalNodeID, nil } } for _, pClaim := range vol.PastClaims { - if pClaim.NodeID == claim.NodeID { + if pClaim.NodeID == claim.NodeID && pClaim.ExternalNodeID != "" { return pClaim.ExternalNodeID, nil } } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 50765cf5b1be..3c1c12e97a1d 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -256,7 +256,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) { } claimResp := &structs.CSIVolumeClaimResponse{} err := msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) - require.EqualError(t, err, fmt.Sprintf("controller publish: volume not found: %s", id0), + require.EqualError(t, err, fmt.Sprintf("volume not found: %s", id0), "expected 'volume not found' error because volume hasn't yet been created") // Create a plugin and volume @@ -448,14 +448,14 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) { claimResp := &structs.CSIVolumeClaimResponse{} err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) // Because the node is not registered - require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node") + require.EqualError(t, err, "controller publish: controller attach volume: No path to node") // The node SecretID is authorized for all policies claimReq.AuthToken = node.SecretID claimReq.Namespace = "" claimResp = &structs.CSIVolumeClaimResponse{} err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) - require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node") + require.EqualError(t, err, "controller publish: controller attach volume: No path to node") } func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { @@ -514,7 +514,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { { name: "first unpublish", startingState: structs.CSIVolumeClaimStateTaken, - expectedErrMsg: "could not detach from node: No path to node", + expectedErrMsg: "could not detach from controller: controller detach volume: No path to node", }, }