Skip to content

Commit

Permalink
Merge pull request #12700 from hashicorp/backport/b-csi-volume-detach…
Browse files Browse the repository at this point in the history
…-order/evenly-cosmic-eft

This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core committed Apr 20, 2022
2 parents 47b0bb5 + 8a24bc5 commit ce4e4ec
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 32 deletions.
5 changes: 2 additions & 3 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.C
}
}
c.logger.Debug(
"volume could not be claimed because it is in use, retrying in %v", backoff)
"volume could not be claimed because it is in use", "retry_in", backoff)
t.Reset(backoff)
}
return &resp, err
Expand Down Expand Up @@ -377,8 +377,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be unmounted, retrying in %v", backoff)
c.logger.Debug("volume could not be unmounted", "retry_in", backoff)
t.Reset(backoff)
}
return nil
Expand Down
20 changes: 8 additions & 12 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2383,21 +2383,17 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
c := core.(*CoreScheduler)
require.NoError(c.csiVolumeClaimGC(gc))

// TODO(tgross): the condition below means this test doesn't tell
// us much; ideally we should be intercepting the claim request
// and verifying that we send the expected claims but we don't
// have test infra in place to do that for server RPCs

// sending the GC claim will trigger the volumewatcher's normal
// code path. but the volumewatcher will hit an error here
// because there's no path to the node, so we shouldn't see
// the WriteClaims removed
// code path. the volumewatcher will hit an error here because
// there's no path to the node, but this is a node-only plugin so
// we accept that the node has been GC'd and there's no point
// holding onto the claim
require.Eventually(func() bool {
vol, _ := state.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 1 &&
len(vol.WriteAllocs) == 1 &&
len(vol.PastClaims) == 1
}, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly")
return len(vol.WriteClaims) == 0 &&
len(vol.WriteAllocs) == 0 &&
len(vol.PastClaims) == 0
}, time.Second*2, 10*time.Millisecond, "claims were not released")

}

Expand Down
34 changes: 21 additions & 13 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,15 +397,6 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
args.NodeID = alloc.NodeID
}

if isNewClaim {
// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
err = v.controllerPublishVolume(args, reply)
if err != nil {
return fmt.Errorf("controller publish: %v", err)
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
Expand All @@ -415,6 +406,15 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
return respErr
}

if isNewClaim {
// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
err = v.controllerPublishVolume(args, reply)
if err != nil {
return fmt.Errorf("controller publish: %v", err)
}
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
Expand Down Expand Up @@ -495,7 +495,10 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest,

err = v.srv.RPC(method, cReq, cResp)
if err != nil {
return fmt.Errorf("attach volume: %v", err)
if strings.Contains(err.Error(), "FailedPrecondition") {
return fmt.Errorf("%v: %v", structs.ErrCSIClientRPCRetryable, err)
}
return err
}
resp.PublishContext = cResp.PublishContext
return nil
Expand Down Expand Up @@ -650,6 +653,11 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
}

func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
if claim.AccessMode == structs.CSIVolumeAccessModeUnknown {
// claim has already been released client-side
return nil
}

req := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: vol.PluginID,
VolumeID: vol.ID,
Expand Down Expand Up @@ -745,17 +753,17 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
// and GC'd by this point, so looking there is the last resort.
func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) (string, error) {
for _, rClaim := range vol.ReadClaims {
if rClaim.NodeID == claim.NodeID {
if rClaim.NodeID == claim.NodeID && rClaim.ExternalNodeID != "" {
return rClaim.ExternalNodeID, nil
}
}
for _, wClaim := range vol.WriteClaims {
if wClaim.NodeID == claim.NodeID {
if wClaim.NodeID == claim.NodeID && wClaim.ExternalNodeID != "" {
return wClaim.ExternalNodeID, nil
}
}
for _, pClaim := range vol.PastClaims {
if pClaim.NodeID == claim.NodeID {
if pClaim.NodeID == claim.NodeID && pClaim.ExternalNodeID != "" {
return pClaim.ExternalNodeID, nil
}
}
Expand Down
8 changes: 4 additions & 4 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
}
claimResp := &structs.CSIVolumeClaimResponse{}
err := msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.EqualError(t, err, fmt.Sprintf("controller publish: volume not found: %s", id0),
require.EqualError(t, err, fmt.Sprintf("volume not found: %s", id0),
"expected 'volume not found' error because volume hasn't yet been created")

// Create a plugin and volume
Expand Down Expand Up @@ -448,14 +448,14 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
claimResp := &structs.CSIVolumeClaimResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
// Because the node is not registered
require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node")
require.EqualError(t, err, "controller publish: controller attach volume: No path to node")

// The node SecretID is authorized for all policies
claimReq.AuthToken = node.SecretID
claimReq.Namespace = ""
claimResp = &structs.CSIVolumeClaimResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node")
require.EqualError(t, err, "controller publish: controller attach volume: No path to node")
}

func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
{
name: "first unpublish",
startingState: structs.CSIVolumeClaimStateTaken,
expectedErrMsg: "could not detach from node: No path to node",
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
},
}

Expand Down

0 comments on commit ce4e4ec

Please sign in to comment.