From 9384b1f77e0de136fef28cd7e9329482711887a5 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 6 Aug 2020 14:51:46 -0400 Subject: [PATCH] csi: release claims via csi_hook postrun unpublish RPC (#8580) Add a Postrun hook to send the `CSIVolume.Unpublish` RPC to the server. This may forward client RPCs to the node plugins or to the controller plugins, depending on whether other allocations on this node have claims on this volume. By making clients responsible for running the `CSIVolume.Unpublish` RPC (and making the RPC available to a `nomad volume detach` command), the volumewatcher becomes only used by the core GC job and we no longer need async volume GC from job deregister and node update. --- client/allocrunner/csi_hook.go | 46 ++++++++-- nomad/config.go | 2 +- nomad/core_sched.go | 4 +- nomad/csi_batch.go | 76 ---------------- nomad/csi_batch_test.go | 34 ------- nomad/job_endpoint.go | 25 +---- nomad/node_endpoint.go | 24 +---- nomad/node_endpoint_test.go | 126 -------------------------- nomad/volumewatcher/volume_watcher.go | 4 - 9 files changed, 47 insertions(+), 294 deletions(-) delete mode 100644 nomad/csi_batch.go delete mode 100644 nomad/csi_batch_test.go diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 0fccc51b5f49..dd41c3dd7eca 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -5,6 +5,7 @@ import ( "fmt" hclog "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" @@ -15,12 +16,13 @@ import ( // // It is a noop for allocs that do not depend on CSI Volumes. type csiHook struct { - ar *allocRunner - alloc *structs.Allocation - logger hclog.Logger - csimanager csimanager.Manager - rpcClient RPCer - updater hookResourceSetter + ar *allocRunner + alloc *structs.Allocation + logger hclog.Logger + csimanager csimanager.Manager + rpcClient RPCer + updater hookResourceSetter + volumeRequests map[string]*volumeAndRequest } func (c *csiHook) Name() string { @@ -43,6 +45,7 @@ func (c *csiHook) Prerun() error { if err != nil { return fmt.Errorf("claim volumes: %v", err) } + c.volumeRequests = volumes mounts := make(map[string]*csimanager.MountInfo, len(volumes)) for alias, pair := range volumes { @@ -73,6 +76,37 @@ func (c *csiHook) Prerun() error { return nil } +// Postrun sends an RPC to the server to unpublish the volume. This may +// forward client RPCs to the node plugins or to the controller plugins, +// depending on whether other allocations on this node have claims on this +// volume. +func (c *csiHook) Postrun() error { + if !c.shouldRun() { + return nil + } + + var mErr *multierror.Error + + for _, pair := range c.volumeRequests { + req := &structs.CSIVolumeUnpublishRequest{ + VolumeID: pair.request.Source, + Claim: &structs.CSIVolumeClaim{ + AllocationID: c.alloc.ID, + NodeID: c.alloc.NodeID, + Mode: structs.CSIVolumeClaimRelease, + }, + WriteRequest: structs.WriteRequest{ + Region: c.alloc.Job.Region, Namespace: c.alloc.Job.Namespace}, + } + err := c.rpcClient.RPC("CSIVolume.Unpublish", + req, &structs.CSIVolumeUnpublishResponse{}) + if err != nil { + mErr = multierror.Append(mErr, err) + } + } + return mErr.ErrorOrNil() +} + type volumeAndRequest struct { volume *structs.CSIVolume request *structs.VolumeRequest diff --git a/nomad/config.go b/nomad/config.go index 2e25e047168d..14e92adf33fb 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -402,7 +402,7 @@ func DefaultConfig() *Config { CSIPluginGCInterval: 5 * time.Minute, CSIPluginGCThreshold: 1 * time.Hour, CSIVolumeClaimGCInterval: 5 * time.Minute, - CSIVolumeClaimGCThreshold: 1 * time.Hour, + CSIVolumeClaimGCThreshold: 5 * time.Minute, EvalNackTimeout: 60 * time.Second, EvalDeliveryLimit: 3, EvalNackInitialReenqueueDelay: 1 * time.Second, diff --git a/nomad/core_sched.go b/nomad/core_sched.go index c2d3a1463d7d..5e47b9184cdc 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -783,7 +783,7 @@ NEXT_VOLUME: if err != nil { return err } - if alloc == nil { + if alloc == nil || alloc.TerminalStatus() { err = gcClaims(vol.Namespace, vol.ID) if err != nil { return err @@ -796,7 +796,7 @@ NEXT_VOLUME: if err != nil { return err } - if alloc == nil { + if alloc == nil || alloc.TerminalStatus() { err = gcClaims(vol.Namespace, vol.ID) if err != nil { return err diff --git a/nomad/csi_batch.go b/nomad/csi_batch.go deleted file mode 100644 index 50e36e7a6c40..000000000000 --- a/nomad/csi_batch.go +++ /dev/null @@ -1,76 +0,0 @@ -package nomad - -import ( - log "github.com/hashicorp/go-hclog" - multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/nomad/nomad/structs" -) - -// csiBatchRelease is a helper for any time we need to release a bunch -// of volume claims at once. It de-duplicates the volumes and batches -// the raft messages into manageable chunks. Intended for use by RPCs -// that have already been forwarded to the leader. -type csiBatchRelease struct { - srv *Server - logger log.Logger - - maxBatchSize int - seen map[string]struct{} - batches []*structs.CSIVolumeClaimBatchRequest -} - -func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease { - return &csiBatchRelease{ - srv: srv, - logger: logger, - maxBatchSize: max, - seen: map[string]struct{}{}, - batches: []*structs.CSIVolumeClaimBatchRequest{{}}, - } -} - -// add the volume ID + namespace to the deduplicated batches -func (c *csiBatchRelease) add(vol, namespace string) { - id := vol + "\x00" + namespace - - // ignore duplicates - _, seen := c.seen[id] - if seen { - return - } - c.seen[id] = struct{}{} - - req := structs.CSIVolumeClaimRequest{ - VolumeID: vol, - Claim: structs.CSIVolumeClaimRelease, - } - req.Namespace = namespace - req.Region = c.srv.config.Region - - for _, batch := range c.batches { - // otherwise append to the first non-full batch - if len(batch.Claims) < c.maxBatchSize { - batch.Claims = append(batch.Claims, req) - return - } - } - // no non-full batch found, make a new one - newBatch := &structs.CSIVolumeClaimBatchRequest{ - Claims: []structs.CSIVolumeClaimRequest{req}} - c.batches = append(c.batches, newBatch) -} - -// apply flushes the batches to raft -func (c *csiBatchRelease) apply() error { - var result *multierror.Error - for _, batch := range c.batches { - if len(batch.Claims) > 0 { - _, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch) - if err != nil { - c.logger.Error("csi raft apply failed", "error", err, "method", "claim") - result = multierror.Append(result, err) - } - } - } - return result.ErrorOrNil() -} diff --git a/nomad/csi_batch_test.go b/nomad/csi_batch_test.go deleted file mode 100644 index 565fef591e30..000000000000 --- a/nomad/csi_batch_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package nomad - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestCSI_Batcher(t *testing.T) { - t.Parallel() - srv, shutdown := TestServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer shutdown() - - batcher := newCSIBatchRelease(srv, nil, 5) - - batcher.add("vol0", "global") - batcher.add("vol", "0global") - batcher.add("vol1", "global") - batcher.add("vol1", "global") - batcher.add("vol2", "global") - batcher.add("vol2", "other") - batcher.add("vol3", "global") - batcher.add("vol4", "global") - batcher.add("vol5", "global") - batcher.add("vol6", "global") - - require.Len(t, batcher.batches, 2) - require.Len(t, batcher.batches[0].Claims, 5, "first batch") - require.Equal(t, batcher.batches[0].Claims[4].VolumeID, "vol2") - require.Equal(t, batcher.batches[0].Claims[4].Namespace, "other") - require.Len(t, batcher.batches[1].Claims, 4, "second batch") -} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 834ead17bba0..a6b43798a296 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -780,19 +780,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD return err } - // For a job with volumes, find its volumes before deleting the job. - // Later we'll apply this raft. - volumesToGC := newCSIBatchRelease(j.srv, j.logger, 100) - if job != nil { - for _, tg := range job.TaskGroups { - for _, vol := range tg.Volumes { - if vol.Type == structs.VolumeTypeCSI { - volumesToGC.add(vol.Source, job.Namespace) - } - } - } - } - var eval *structs.Evaluation // The job priority / type is strange for this, since it's not a high @@ -832,13 +819,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD reply.EvalCreateIndex = index reply.Index = index - // Make a raft apply to release the CSI volume claims of terminal allocs. - var result *multierror.Error - err = volumesToGC.apply() - if err != nil { - result = multierror.Append(result, err) - } - // COMPAT(1.1.0) - Remove entire conditional block // 0.12.1 introduced atomic job deregistration eval if eval != nil && args.Eval == nil { @@ -852,16 +832,15 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Commit this evaluation via Raft _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) if err != nil { - result = multierror.Append(result, err) j.logger.Error("eval create failed", "error", err, "method", "deregister") - return result.ErrorOrNil() + return err } reply.EvalCreateIndex = evalIndex reply.Index = evalIndex } - return result.ErrorOrNil() + return nil } // BatchDeregister is used to remove a set of jobs from the cluster. diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index c83e463127c6..97af9aafc7b5 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1083,10 +1083,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene now := time.Now() var evals []*structs.Evaluation - // A set of de-duplicated volumes that need their volume claims released. - // Later we'll apply this raft. - volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100) - for _, allocToUpdate := range args.Alloc { allocToUpdate.ModifyTime = now.UTC().UnixNano() @@ -1115,14 +1111,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene continue } - // If the terminal alloc has CSI volumes, add the volumes to the batch - // of volumes we'll release the claims of. - for _, vol := range taskGroup.Volumes { - if vol.Type == structs.VolumeTypeCSI { - volumesToGC.add(vol.Source, alloc.Namespace) - } - } - // Add an evaluation if this is a failed alloc that is eligible for rescheduling if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { eval := &structs.Evaluation{ @@ -1140,13 +1128,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene } } - // Make a raft apply to release the CSI volume claims of terminal allocs. - var result *multierror.Error - err := volumesToGC.apply() - if err != nil { - result = multierror.Append(result, err) - } - // Add this to the batch n.updatesLock.Lock() n.updates = append(n.updates, args.Alloc...) @@ -1177,13 +1158,12 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene // Wait for the future if err := future.Wait(); err != nil { - result = multierror.Append(result, err) - return result.ErrorOrNil() + return err } // Setup the response reply.Index = future.Index() - return result.ErrorOrNil() + return nil } // batchUpdate is used to update all the allocations diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 7b0c13b492d2..1be5728c598b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2313,132 +2313,6 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) { } } -func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { - t.Parallel() - srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) - defer shutdown() - testutil.WaitForLeader(t, srv.RPC) - - codec := rpcClient(t, srv) - state := srv.fsm.State() - - index := uint64(0) - ws := memdb.NewWatchSet() - - // Create a client node, plugin, and volume - node := mock.Node() - node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version - node.CSINodePlugins = map[string]*structs.CSIInfo{ - "csi-plugin-example": {PluginID: "csi-plugin-example", - Healthy: true, - NodeInfo: &structs.CSINodeInfo{}, - ControllerInfo: &structs.CSIControllerInfo{}, - }, - } - index++ - err := state.UpsertNode(index, node) - require.NoError(t, err) - volId0 := uuid.Generate() - ns := structs.DefaultNamespace - vols := []*structs.CSIVolume{{ - ID: volId0, - Namespace: ns, - PluginID: "csi-plugin-example", - AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, - AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - }} - index++ - err = state.CSIVolumeRegister(index, vols) - require.NoError(t, err) - vol, err := state.CSIVolumeByID(ws, ns, volId0) - require.NoError(t, err) - require.Len(t, vol.ReadAllocs, 0) - require.Len(t, vol.WriteAllocs, 0) - - // Create a job with 2 allocations - job := mock.Job() - job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ - "_": { - Name: "someVolume", - Type: structs.VolumeTypeCSI, - Source: volId0, - ReadOnly: false, - }, - } - index++ - err = state.UpsertJob(index, job) - require.NoError(t, err) - - alloc1 := mock.Alloc() - alloc1.JobID = job.ID - alloc1.NodeID = node.ID - index++ - err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID)) - require.NoError(t, err) - alloc1.TaskGroup = job.TaskGroups[0].Name - - alloc2 := mock.Alloc() - alloc2.JobID = job.ID - alloc2.NodeID = node.ID - index++ - err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID)) - require.NoError(t, err) - alloc2.TaskGroup = job.TaskGroups[0].Name - - index++ - err = state.UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2}) - require.NoError(t, err) - - // Claim the volumes and verify the claims were set. We need to - // apply this through the FSM so that we make sure the index is - // properly updated to test later - batch := &structs.CSIVolumeClaimBatchRequest{ - Claims: []structs.CSIVolumeClaimRequest{ - { - VolumeID: volId0, - AllocationID: alloc1.ID, - NodeID: alloc1.NodeID, - Claim: structs.CSIVolumeClaimWrite, - }, - { - VolumeID: volId0, - AllocationID: alloc2.ID, - NodeID: alloc2.NodeID, - Claim: structs.CSIVolumeClaimRead, - }, - }} - _, lastIndex, err := srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch) - require.NoError(t, err) - - vol, err = state.CSIVolumeByID(ws, ns, volId0) - require.NoError(t, err) - require.Len(t, vol.ReadAllocs, 1) - require.Len(t, vol.WriteAllocs, 1) - - // Update the 1st alloc as terminal/failed - alloc1.ClientStatus = structs.AllocClientStatusFailed - err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", - &structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc1}, - WriteRequest: structs.WriteRequest{Region: "global"}, - }, &structs.NodeAllocsResponse{}) - require.NoError(t, err) - - // Lookup the alloc and verify status was updated - out, err := state.AllocByID(ws, alloc1.ID) - require.NoError(t, err) - require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus) - - // Verify the index has been updated to trigger a volume claim release - - req := &structs.CSIVolumeGetRequest{ID: volId0} - req.Region = "global" - getResp := &structs.CSIVolumeGetResponse{} - err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, getResp) - require.NoError(t, err) - require.Greater(t, getResp.Volume.ModifyIndex, lastIndex) -} - func TestClientEndpoint_CreateNodeEvals(t *testing.T) { t.Parallel() diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index cf4fc06fb20e..ca413a9ff6f4 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -168,10 +168,6 @@ func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool { func (vw *volumeWatcher) volumeReapImpl(vol *structs.CSIVolume) error { - if len(vol.PastClaims) == 0 { - return nil - } - // PastClaims written by a volume GC core job will have no allocation, // so we need to find out which allocs are eligible for cleanup. for _, claim := range vol.PastClaims {