Skip to content

Commit

Permalink
csi: move volume claim release into volumewatcher (#7794)
Browse files Browse the repository at this point in the history
This changeset adds a subsystem to run on the leader, similar to the
deployment watcher or node drainer. The `Watcher` performs a blocking
query on updates to the `CSIVolumes` table and triggers reaping of
volume claims.

This will avoid tying up scheduling workers by immediately sending
volume claim workloads into their own loop, rather than blocking the
scheduling workers in the core GC job doing things like talking to CSI
controllers

The volume watcher is enabled on leader step-up and disabled on leader
step-down.

The volume claim GC mechanism now makes an empty claim RPC for the
volume to trigger an index bump. That in turn unblocks the blocking
query in the volume watcher so it can assess which claims can be
released for a volume.
  • Loading branch information
tgross committed Apr 30, 2020
1 parent 25a74ec commit 775de0d
Show file tree
Hide file tree
Showing 21 changed files with 1,721 additions and 503 deletions.
212 changes: 14 additions & 198 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
Expand Down Expand Up @@ -711,212 +709,30 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
return timeDiff > interval.Nanoseconds()
}

// TODO: we need a periodic trigger to iterate over all the volumes and split
// them up into separate work items, same as we do for jobs.

// csiVolumeClaimGC is used to garbage collect CSI volume claims
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
c.logger.Trace("garbage collecting unclaimed CSI volume claims")
c.logger.Trace("garbage collecting unclaimed CSI volume claims", "eval.JobID", eval.JobID)

// Volume ID smuggled in with the eval's own JobID
evalVolID := strings.Split(eval.JobID, ":")
if len(evalVolID) != 3 {

// COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2
if len(evalVolID) < 2 {
c.logger.Error("volume gc called without volID")
return nil
}

volID := evalVolID[1]
runningAllocs := evalVolID[2] == "purge"
return volumeClaimReap(c.srv, volID, eval.Namespace,
c.srv.config.Region, eval.LeaderACL, runningAllocs)
}

func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error {

ws := memdb.NewWatchSet()

vol, err := srv.State().CSIVolumeByID(ws, namespace, volID)
if err != nil {
return err
}
if vol == nil {
return nil
}
vol, err = srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}

plug, err := srv.State().CSIPluginByID(ws, vol.PluginID)
if err != nil {
return err
}

nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)

var result *multierror.Error
for _, claim := range vol.PastClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
claim: claim,
namespace: namespace,
region: region,
leaderACL: leaderACL,
nodeClaims: nodeClaims,
},
)
if err != nil {
result = multierror.Append(result, err)
continue
}
req := &structs.CSIVolumeClaimRequest{
VolumeID: volID,
Claim: structs.CSIVolumeClaimRelease,
}
return result.ErrorOrNil()

}
req.Namespace = eval.Namespace
req.Region = c.srv.config.Region

func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) map[string]int {
nodeClaims := map[string]int{} // node IDs -> count

collectFunc := func(allocs map[string]*structs.Allocation,
claims map[string]*structs.CSIVolumeClaim) {

for allocID, alloc := range allocs {
claim, ok := claims[allocID]
if !ok {
// COMPAT(1.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. note that we'll have non-nil
// allocs here because we called denormalize on the
// value.
claim = &structs.CSIVolumeClaim{
AllocationID: allocID,
NodeID: alloc.NodeID,
State: structs.CSIVolumeClaimStateTaken,
}
}
nodeClaims[claim.NodeID]++
if runningAllocs || alloc.Terminated() {
// only overwrite the PastClaim if this is new,
// so that we can track state between subsequent calls
if _, exists := vol.PastClaims[claim.AllocationID]; !exists {
claim.State = structs.CSIVolumeClaimStateTaken
vol.PastClaims[claim.AllocationID] = claim
}
}
}
}

collectFunc(vol.WriteAllocs, vol.WriteClaims)
collectFunc(vol.ReadAllocs, vol.ReadClaims)
return nodeClaims
}

type volumeClaimReapArgs struct {
vol *structs.CSIVolume
plug *structs.CSIPlugin
claim *structs.CSIVolumeClaim
region string
namespace string
leaderACL string
nodeClaims map[string]int // node IDs -> count
}

func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]int, error) {
vol := args.vol
claim := args.claim

var err error
var nReq *cstructs.ClientCSINodeDetachVolumeRequest

checkpoint := func(claimState structs.CSIVolumeClaimState) error {
req := &structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: claim.AllocationID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Region: args.region,
Namespace: args.namespace,
AuthToken: args.leaderACL,
},
}
return srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
}

// previous checkpoints may have set the past claim state already.
// in practice we should never see CSIVolumeClaimStateControllerDetached
// but having an option for the state makes it easy to add a checkpoint
// in a backwards compatible way if we need one later
switch claim.State {
case structs.CSIVolumeClaimStateNodeDetached:
goto NODE_DETACHED
case structs.CSIVolumeClaimStateControllerDetached:
goto RELEASE_CLAIM
case structs.CSIVolumeClaimStateReadyToFree:
goto RELEASE_CLAIM
}

// (1) NodePublish / NodeUnstage must be completed before controller
// operations or releasing the claim.
nReq = &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: args.plug.ID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err = srv.RPC("ClientCSI.NodeDetachVolume", nReq,
&cstructs.ClientCSINodeDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
err = checkpoint(structs.CSIVolumeClaimStateNodeDetached)
if err != nil {
return args.nodeClaims, err
}

NODE_DETACHED:
args.nodeClaims[claim.NodeID]--

// (2) we only emit the controller unpublish if no other allocs
// on the node need it, but we also only want to make this
// call at most once per node
if vol.ControllerRequired && args.nodeClaims[claim.NodeID] < 1 {

// we need to get the CSI Node ID, which is not the same as
// the Nomad Node ID
ws := memdb.NewWatchSet()
targetNode, err := srv.State().NodeByID(ws, claim.NodeID)
if err != nil {
return args.nodeClaims, err
}
if targetNode == nil {
return args.nodeClaims, fmt.Errorf("%s: %s",
structs.ErrUnknownNodePrefix, claim.NodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID]
if !ok {
return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}

cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
}
cReq.PluginID = args.plug.ID
err = srv.RPC("ClientCSI.ControllerDetachVolume", cReq,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
}

RELEASE_CLAIM:
// (3) release the claim from the state store, allowing it to be rescheduled
err = checkpoint(structs.CSIVolumeClaimStateReadyToFree)
if err != nil {
return args.nodeClaims, err
}
return args.nodeClaims, nil
err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
return err
}
Loading

0 comments on commit 775de0d

Please sign in to comment.