Skip to content

Commit

Permalink
csi: release claims via csi_hook postrun unpublish RPC (#8580)
Browse files Browse the repository at this point in the history
Add a Postrun hook to send the `CSIVolume.Unpublish` RPC to the server. This
may forward client RPCs to the node plugins or to the controller plugins,
depending on whether other allocations on this node have claims on this
volume.

By making clients responsible for running the `CSIVolume.Unpublish` RPC (and
making the RPC available to a `nomad volume detach` command), the
volumewatcher becomes only used by the core GC job and we no longer need
async volume GC from job deregister and node update.
  • Loading branch information
tgross committed Aug 6, 2020
1 parent 713b5f5 commit 9384b1f
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 294 deletions.
46 changes: 40 additions & 6 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
Expand All @@ -15,12 +16,13 @@ import (
//
// It is a noop for allocs that do not depend on CSI Volumes.
type csiHook struct {
ar *allocRunner
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
rpcClient RPCer
updater hookResourceSetter
ar *allocRunner
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
rpcClient RPCer
updater hookResourceSetter
volumeRequests map[string]*volumeAndRequest
}

func (c *csiHook) Name() string {
Expand All @@ -43,6 +45,7 @@ func (c *csiHook) Prerun() error {
if err != nil {
return fmt.Errorf("claim volumes: %v", err)
}
c.volumeRequests = volumes

mounts := make(map[string]*csimanager.MountInfo, len(volumes))
for alias, pair := range volumes {
Expand Down Expand Up @@ -73,6 +76,37 @@ func (c *csiHook) Prerun() error {
return nil
}

// Postrun sends an RPC to the server to unpublish the volume. This may
// forward client RPCs to the node plugins or to the controller plugins,
// depending on whether other allocations on this node have claims on this
// volume.
func (c *csiHook) Postrun() error {
if !c.shouldRun() {
return nil
}

var mErr *multierror.Error

for _, pair := range c.volumeRequests {
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: pair.request.Source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: structs.CSIVolumeClaimRelease,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region, Namespace: c.alloc.Job.Namespace},
}
err := c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})
if err != nil {
mErr = multierror.Append(mErr, err)
}
}
return mErr.ErrorOrNil()
}

type volumeAndRequest struct {
volume *structs.CSIVolume
request *structs.VolumeRequest
Expand Down
2 changes: 1 addition & 1 deletion nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func DefaultConfig() *Config {
CSIPluginGCInterval: 5 * time.Minute,
CSIPluginGCThreshold: 1 * time.Hour,
CSIVolumeClaimGCInterval: 5 * time.Minute,
CSIVolumeClaimGCThreshold: 1 * time.Hour,
CSIVolumeClaimGCThreshold: 5 * time.Minute,
EvalNackTimeout: 60 * time.Second,
EvalDeliveryLimit: 3,
EvalNackInitialReenqueueDelay: 1 * time.Second,
Expand Down
4 changes: 2 additions & 2 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ NEXT_VOLUME:
if err != nil {
return err
}
if alloc == nil {
if alloc == nil || alloc.TerminalStatus() {
err = gcClaims(vol.Namespace, vol.ID)
if err != nil {
return err
Expand All @@ -796,7 +796,7 @@ NEXT_VOLUME:
if err != nil {
return err
}
if alloc == nil {
if alloc == nil || alloc.TerminalStatus() {
err = gcClaims(vol.Namespace, vol.ID)
if err != nil {
return err
Expand Down
76 changes: 0 additions & 76 deletions nomad/csi_batch.go

This file was deleted.

34 changes: 0 additions & 34 deletions nomad/csi_batch_test.go

This file was deleted.

25 changes: 2 additions & 23 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,19 +780,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}

// For a job with volumes, find its volumes before deleting the job.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(j.srv, j.logger, 100)
if job != nil {
for _, tg := range job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC.add(vol.Source, job.Namespace)
}
}
}
}

var eval *structs.Evaluation

// The job priority / type is strange for this, since it's not a high
Expand Down Expand Up @@ -832,13 +819,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
reply.EvalCreateIndex = index
reply.Index = index

// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
err = volumesToGC.apply()
if err != nil {
result = multierror.Append(result, err)
}

// COMPAT(1.1.0) - Remove entire conditional block
// 0.12.1 introduced atomic job deregistration eval
if eval != nil && args.Eval == nil {
Expand All @@ -852,16 +832,15 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
return err
}

reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}

return result.ErrorOrNil()
return nil
}

// BatchDeregister is used to remove a set of jobs from the cluster.
Expand Down
24 changes: 2 additions & 22 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,10 +1083,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
now := time.Now()
var evals []*structs.Evaluation

// A set of de-duplicated volumes that need their volume claims released.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100)

for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()

Expand Down Expand Up @@ -1115,14 +1111,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}

// If the terminal alloc has CSI volumes, add the volumes to the batch
// of volumes we'll release the claims of.
for _, vol := range taskGroup.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC.add(vol.Source, alloc.Namespace)
}
}

// Add an evaluation if this is a failed alloc that is eligible for rescheduling
if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
eval := &structs.Evaluation{
Expand All @@ -1140,13 +1128,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
}
}

// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
err := volumesToGC.apply()
if err != nil {
result = multierror.Append(result, err)
}

// Add this to the batch
n.updatesLock.Lock()
n.updates = append(n.updates, args.Alloc...)
Expand Down Expand Up @@ -1177,13 +1158,12 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene

// Wait for the future
if err := future.Wait(); err != nil {
result = multierror.Append(result, err)
return result.ErrorOrNil()
return err
}

// Setup the response
reply.Index = future.Index()
return result.ErrorOrNil()
return nil
}

// batchUpdate is used to update all the allocations
Expand Down
Loading

0 comments on commit 9384b1f

Please sign in to comment.