Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of CSI: reorder controller volume detachment into release/1.1.x #12701

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.C
}
}
c.logger.Debug(
"volume could not be claimed because it is in use, retrying in %v", backoff)
"volume could not be claimed because it is in use", "retry_in", backoff)
t.Reset(backoff)
}
return &resp, err
Expand Down Expand Up @@ -377,8 +377,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be unmounted, retrying in %v", backoff)
c.logger.Debug("volume could not be unmounted", "retry_in", backoff)
t.Reset(backoff)
}
return nil
Expand Down
20 changes: 8 additions & 12 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2383,21 +2383,17 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
c := core.(*CoreScheduler)
require.NoError(c.csiVolumeClaimGC(gc))

// TODO(tgross): the condition below means this test doesn't tell
// us much; ideally we should be intercepting the claim request
// and verifying that we send the expected claims but we don't
// have test infra in place to do that for server RPCs

// sending the GC claim will trigger the volumewatcher's normal
// code path. but the volumewatcher will hit an error here
// because there's no path to the node, so we shouldn't see
// the WriteClaims removed
// code path. the volumewatcher will hit an error here because
// there's no path to the node, but this is a node-only plugin so
// we accept that the node has been GC'd and there's no point
// holding onto the claim
require.Eventually(func() bool {
vol, _ := state.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 1 &&
len(vol.WriteAllocs) == 1 &&
len(vol.PastClaims) == 1
}, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly")
return len(vol.WriteClaims) == 0 &&
len(vol.WriteAllocs) == 0 &&
len(vol.PastClaims) == 0
}, time.Second*2, 10*time.Millisecond, "claims were not released")

}

Expand Down
34 changes: 21 additions & 13 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,15 +397,6 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
args.NodeID = alloc.NodeID
}

if isNewClaim {
// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
err = v.controllerPublishVolume(args, reply)
if err != nil {
return fmt.Errorf("controller publish: %v", err)
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
Expand All @@ -415,6 +406,15 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
return respErr
}

if isNewClaim {
// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
err = v.controllerPublishVolume(args, reply)
if err != nil {
return fmt.Errorf("controller publish: %v", err)
}
}

reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
Expand Down Expand Up @@ -495,7 +495,10 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest,

err = v.srv.RPC(method, cReq, cResp)
if err != nil {
return fmt.Errorf("attach volume: %v", err)
if strings.Contains(err.Error(), "FailedPrecondition") {
return fmt.Errorf("%v: %v", structs.ErrCSIClientRPCRetryable, err)
}
return err
}
resp.PublishContext = cResp.PublishContext
return nil
Expand Down Expand Up @@ -650,6 +653,11 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
}

func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
if claim.AccessMode == structs.CSIVolumeAccessModeUnknown {
// claim has already been released client-side
return nil
}

req := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: vol.PluginID,
VolumeID: vol.ID,
Expand Down Expand Up @@ -745,17 +753,17 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
// and GC'd by this point, so looking there is the last resort.
func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) (string, error) {
for _, rClaim := range vol.ReadClaims {
if rClaim.NodeID == claim.NodeID {
if rClaim.NodeID == claim.NodeID && rClaim.ExternalNodeID != "" {
return rClaim.ExternalNodeID, nil
}
}
for _, wClaim := range vol.WriteClaims {
if wClaim.NodeID == claim.NodeID {
if wClaim.NodeID == claim.NodeID && wClaim.ExternalNodeID != "" {
return wClaim.ExternalNodeID, nil
}
}
for _, pClaim := range vol.PastClaims {
if pClaim.NodeID == claim.NodeID {
if pClaim.NodeID == claim.NodeID && pClaim.ExternalNodeID != "" {
return pClaim.ExternalNodeID, nil
}
}
Expand Down
8 changes: 4 additions & 4 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
}
claimResp := &structs.CSIVolumeClaimResponse{}
err := msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.EqualError(t, err, fmt.Sprintf("controller publish: volume not found: %s", id0),
require.EqualError(t, err, fmt.Sprintf("volume not found: %s", id0),
"expected 'volume not found' error because volume hasn't yet been created")

// Create a plugin and volume
Expand Down Expand Up @@ -448,14 +448,14 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
claimResp := &structs.CSIVolumeClaimResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
// Because the node is not registered
require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node")
require.EqualError(t, err, "controller publish: controller attach volume: No path to node")

// The node SecretID is authorized for all policies
claimReq.AuthToken = node.SecretID
claimReq.Namespace = ""
claimResp = &structs.CSIVolumeClaimResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node")
require.EqualError(t, err, "controller publish: controller attach volume: No path to node")
}

func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
{
name: "first unpublish",
startingState: structs.CSIVolumeClaimStateTaken,
expectedErrMsg: "could not detach from node: No path to node",
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
},
}

Expand Down