Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: checkpoint volume claim garbage collection #7782

Merged
merged 1 commit into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
req := &structs.CSIVolumeClaimRequest{
VolumeID: pair.request.Source,
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Claim: claimType,
}
req.Region = c.alloc.Job.Region
Expand Down
2 changes: 0 additions & 2 deletions client/pluginmanager/csimanager/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package csimanager

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -47,7 +46,6 @@ func TestInstanceManager_Shutdown(t *testing.T) {
im.shutdownCtxCancelFn = cancelFn
im.shutdownCh = make(chan struct{})
im.updater = func(_ string, info *structs.CSIInfo) {
fmt.Println(info)
lock.Lock()
defer lock.Unlock()
pluginHealth = info.Healthy
Expand Down
132 changes: 78 additions & 54 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,17 +749,15 @@ func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string,
return err
}

gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)
nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)

var result *multierror.Error
for _, claim := range gcClaims {
for _, claim := range vol.PastClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
allocID: claim.allocID,
nodeID: claim.nodeID,
mode: claim.mode,
claim: claim,
namespace: namespace,
region: region,
leaderACL: leaderACL,
Expand All @@ -775,48 +773,47 @@ func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string,

}

type gcClaimRequest struct {
allocID string
nodeID string
mode structs.CSIVolumeClaimMode
}

func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) ([]gcClaimRequest, map[string]int) {
gcAllocs := []gcClaimRequest{}
func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) map[string]int {
nodeClaims := map[string]int{} // node IDs -> count

collectFunc := func(allocs map[string]*structs.Allocation,
mode structs.CSIVolumeClaimMode) {
for _, alloc := range allocs {
// we call denormalize on the volume above to populate
// Allocation pointers. But the alloc might have been
// garbage collected concurrently, so if the alloc is
// still nil we can safely skip it.
if alloc == nil {
continue
claims map[string]*structs.CSIVolumeClaim) {

for allocID, alloc := range allocs {
claim, ok := claims[allocID]
if !ok {
// COMPAT(1.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. note that we'll have non-nil
// allocs here because we called denormalize on the
// value.
claim = &structs.CSIVolumeClaim{
AllocationID: allocID,
NodeID: alloc.NodeID,
State: structs.CSIVolumeClaimStateTaken,
}
}
nodeClaims[alloc.NodeID]++
nodeClaims[claim.NodeID]++
if runningAllocs || alloc.Terminated() {
gcAllocs = append(gcAllocs, gcClaimRequest{
allocID: alloc.ID,
nodeID: alloc.NodeID,
mode: mode,
})
// only overwrite the PastClaim if this is new,
// so that we can track state between subsequent calls
if _, exists := vol.PastClaims[claim.AllocationID]; !exists {
claim.State = structs.CSIVolumeClaimStateTaken
vol.PastClaims[claim.AllocationID] = claim
}
}
}
}

collectFunc(vol.WriteAllocs, structs.CSIVolumeClaimWrite)
collectFunc(vol.ReadAllocs, structs.CSIVolumeClaimRead)
return gcAllocs, nodeClaims
collectFunc(vol.WriteAllocs, vol.WriteClaims)
collectFunc(vol.ReadAllocs, vol.ReadClaims)
return nodeClaims
}

type volumeClaimReapArgs struct {
vol *structs.CSIVolume
plug *structs.CSIPlugin
allocID string
nodeID string
mode structs.CSIVolumeClaimMode
claim *structs.CSIVolumeClaim
region string
namespace string
leaderACL string
Expand All @@ -825,42 +822,78 @@ type volumeClaimReapArgs struct {

func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]int, error) {
vol := args.vol
nodeID := args.nodeID
claim := args.claim

var err error
var nReq *cstructs.ClientCSINodeDetachVolumeRequest

checkpoint := func(claimState structs.CSIVolumeClaimState) error {
req := &structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: claim.AllocationID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Region: args.region,
Namespace: args.namespace,
AuthToken: args.leaderACL,
},
}
return srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
}

// previous checkpoints may have set the past claim state already.
// in practice we should never see CSIVolumeClaimStateControllerDetached
// but having an option for the state makes it easy to add a checkpoint
// in a backwards compatible way if we need one later
switch claim.State {
case structs.CSIVolumeClaimStateNodeDetached:
goto NODE_DETACHED
case structs.CSIVolumeClaimStateControllerDetached:
goto RELEASE_CLAIM
case structs.CSIVolumeClaimStateReadyToFree:
goto RELEASE_CLAIM
}

// (1) NodePublish / NodeUnstage must be completed before controller
// operations or releasing the claim.
nReq := &cstructs.ClientCSINodeDetachVolumeRequest{
nReq = &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: args.plug.ID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: args.allocID,
NodeID: nodeID,
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: args.mode == structs.CSIVolumeClaimRead,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err := srv.RPC("ClientCSI.NodeDetachVolume", nReq,
err = srv.RPC("ClientCSI.NodeDetachVolume", nReq,
&cstructs.ClientCSINodeDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
args.nodeClaims[nodeID]--
err = checkpoint(structs.CSIVolumeClaimStateNodeDetached)
if err != nil {
return args.nodeClaims, err
}

NODE_DETACHED:
args.nodeClaims[claim.NodeID]--

// (2) we only emit the controller unpublish if no other allocs
// on the node need it, but we also only want to make this
// call at most once per node
if vol.ControllerRequired && args.nodeClaims[nodeID] < 1 {
if vol.ControllerRequired && args.nodeClaims[claim.NodeID] < 1 {

// we need to get the CSI Node ID, which is not the same as
// the Nomad Node ID
ws := memdb.NewWatchSet()
targetNode, err := srv.State().NodeByID(ws, nodeID)
targetNode, err := srv.State().NodeByID(ws, claim.NodeID)
if err != nil {
return args.nodeClaims, err
}
if targetNode == nil {
return args.nodeClaims, fmt.Errorf("%s: %s",
structs.ErrUnknownNodePrefix, nodeID)
structs.ErrUnknownNodePrefix, claim.NodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID]
if !ok {
Expand All @@ -879,18 +912,9 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i
}
}

RELEASE_CLAIM:
// (3) release the claim from the state store, allowing it to be rescheduled
req := &structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: args.allocID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Region: args.region,
Namespace: args.namespace,
AuthToken: args.leaderACL,
},
}
err = srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
err = checkpoint(structs.CSIVolumeClaimStateReadyToFree)
if err != nil {
return args.nodeClaims, err
}
Expand Down
62 changes: 30 additions & 32 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2286,12 +2286,22 @@ func TestCSI_GCVolumeClaims_Collection(t *testing.T) {
require.NoError(t, err)

// Claim the volumes and verify the claims were set
err = state.CSIVolumeClaim(index, ns, volId0, alloc1, structs.CSIVolumeClaimWrite)
err = state.CSIVolumeClaim(index, ns, volId0, &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: alloc1.NodeID,
Mode: structs.CSIVolumeClaimWrite,
})
index++
require.NoError(t, err)
err = state.CSIVolumeClaim(index, ns, volId0, alloc2, structs.CSIVolumeClaimRead)

err = state.CSIVolumeClaim(index, ns, volId0, &structs.CSIVolumeClaim{
AllocationID: alloc2.ID,
NodeID: alloc2.NodeID,
Mode: structs.CSIVolumeClaimRead,
})
index++
require.NoError(t, err)

vol, err = state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
Expand All @@ -2306,9 +2316,9 @@ func TestCSI_GCVolumeClaims_Collection(t *testing.T) {
vol, err = state.CSIVolumeDenormalize(ws, vol)
require.NoError(t, err)

gcClaims, nodeClaims := collectClaimsToGCImpl(vol, false)
nodeClaims := collectClaimsToGCImpl(vol, false)
require.Equal(t, nodeClaims[node.ID], 2)
require.Len(t, gcClaims, 2)
require.Len(t, vol.PastClaims, 2)
}

func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
Expand All @@ -2326,7 +2336,6 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {

cases := []struct {
Name string
Claim gcClaimRequest
ClaimsCount map[string]int
ControllerRequired bool
ExpectedErr string
Expand All @@ -2338,12 +2347,7 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
srv *MockRPCServer
}{
{
Name: "NodeDetachVolume fails",
Claim: gcClaimRequest{
allocID: alloc.ID,
nodeID: node.ID,
mode: structs.CSIVolumeClaimRead,
},
Name: "NodeDetachVolume fails",
ClaimsCount: map[string]int{node.ID: 1},
ControllerRequired: true,
ExpectedErr: "node plugin missing",
Expand All @@ -2355,36 +2359,26 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
},
},
{
Name: "ControllerDetachVolume no controllers",
Claim: gcClaimRequest{
allocID: alloc.ID,
nodeID: node.ID,
mode: structs.CSIVolumeClaimRead,
},
ClaimsCount: map[string]int{node.ID: 1},
ControllerRequired: true,
ExpectedErr: fmt.Sprintf(
"Unknown node: %s", node.ID),
Name: "ControllerDetachVolume no controllers",
ClaimsCount: map[string]int{node.ID: 1},
ControllerRequired: true,
ExpectedErr: fmt.Sprintf("Unknown node: %s", node.ID),
ExpectedClaimsCount: 0,
ExpectedNodeDetachVolumeCount: 1,
ExpectedControllerDetachVolumeCount: 0,
ExpectedVolumeClaimCount: 1,
srv: &MockRPCServer{
state: s.State(),
},
},
{
Name: "ControllerDetachVolume node-only",
Claim: gcClaimRequest{
allocID: alloc.ID,
nodeID: node.ID,
mode: structs.CSIVolumeClaimRead,
},
Name: "ControllerDetachVolume node-only",
ClaimsCount: map[string]int{node.ID: 1},
ControllerRequired: false,
ExpectedClaimsCount: 0,
ExpectedNodeDetachVolumeCount: 1,
ExpectedControllerDetachVolumeCount: 0,
ExpectedVolumeClaimCount: 1,
ExpectedVolumeClaimCount: 2,
srv: &MockRPCServer{
state: s.State(),
},
Expand All @@ -2394,12 +2388,16 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
vol.ControllerRequired = tc.ControllerRequired
claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: node.ID,
State: structs.CSIVolumeClaimStateTaken,
Mode: structs.CSIVolumeClaimRead,
}
nodeClaims, err := volumeClaimReapImpl(tc.srv, &volumeClaimReapArgs{
vol: vol,
plug: plugin,
allocID: tc.Claim.allocID,
nodeID: tc.Claim.nodeID,
mode: tc.Claim.mode,
claim: claim,
region: "global",
namespace: "default",
leaderACL: "not-in-use",
Expand All @@ -2411,7 +2409,7 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
require.NoError(err)
}
require.Equal(tc.ExpectedClaimsCount,
nodeClaims[tc.Claim.nodeID], "expected claims")
nodeClaims[claim.NodeID], "expected claims remaining")
require.Equal(tc.ExpectedNodeDetachVolumeCount,
tc.srv.countCSINodeDetachVolume, "node detach RPC count")
require.Equal(tc.ExpectedControllerDetachVolumeCount,
Expand Down
7 changes: 6 additions & 1 deletion nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,22 +400,27 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest,
return nil
}

// get Nomad's ID for the client node (not the storage provider's ID)
targetNode, err := state.NodeByID(ws, alloc.NodeID)
if err != nil {
return err
}
if targetNode == nil {
return fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, alloc.NodeID)
}

// get the the storage provider's ID for the client node (not
// Nomad's ID for the node)
targetCSIInfo, ok := targetNode.CSINodePlugins[plug.ID]
if !ok {
return fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}
externalNodeID := targetCSIInfo.NodeInfo.ID

method := "ClientCSI.ControllerAttachVolume"
cReq := &cstructs.ClientCSIControllerAttachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
ClientCSINodeID: externalNodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: req.Claim == structs.CSIVolumeClaimRead,
Expand Down
Loading