diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 0fccc51b5f49..dd41c3dd7eca 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -5,6 +5,7 @@ import ( "fmt" hclog "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" @@ -15,12 +16,13 @@ import ( // // It is a noop for allocs that do not depend on CSI Volumes. type csiHook struct { - ar *allocRunner - alloc *structs.Allocation - logger hclog.Logger - csimanager csimanager.Manager - rpcClient RPCer - updater hookResourceSetter + ar *allocRunner + alloc *structs.Allocation + logger hclog.Logger + csimanager csimanager.Manager + rpcClient RPCer + updater hookResourceSetter + volumeRequests map[string]*volumeAndRequest } func (c *csiHook) Name() string { @@ -43,6 +45,7 @@ func (c *csiHook) Prerun() error { if err != nil { return fmt.Errorf("claim volumes: %v", err) } + c.volumeRequests = volumes mounts := make(map[string]*csimanager.MountInfo, len(volumes)) for alias, pair := range volumes { @@ -73,6 +76,37 @@ func (c *csiHook) Prerun() error { return nil } +// Postrun sends an RPC to the server to unpublish the volume. This may +// forward client RPCs to the node plugins or to the controller plugins, +// depending on whether other allocations on this node have claims on this +// volume. +func (c *csiHook) Postrun() error { + if !c.shouldRun() { + return nil + } + + var mErr *multierror.Error + + for _, pair := range c.volumeRequests { + req := &structs.CSIVolumeUnpublishRequest{ + VolumeID: pair.request.Source, + Claim: &structs.CSIVolumeClaim{ + AllocationID: c.alloc.ID, + NodeID: c.alloc.NodeID, + Mode: structs.CSIVolumeClaimRelease, + }, + WriteRequest: structs.WriteRequest{ + Region: c.alloc.Job.Region, Namespace: c.alloc.Job.Namespace}, + } + err := c.rpcClient.RPC("CSIVolume.Unpublish", + req, &structs.CSIVolumeUnpublishResponse{}) + if err != nil { + mErr = multierror.Append(mErr, err) + } + } + return mErr.ErrorOrNil() +} + type volumeAndRequest struct { volume *structs.CSIVolume request *structs.VolumeRequest diff --git a/nomad/config.go b/nomad/config.go index 2e25e047168d..14e92adf33fb 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -402,7 +402,7 @@ func DefaultConfig() *Config { CSIPluginGCInterval: 5 * time.Minute, CSIPluginGCThreshold: 1 * time.Hour, CSIVolumeClaimGCInterval: 5 * time.Minute, - CSIVolumeClaimGCThreshold: 1 * time.Hour, + CSIVolumeClaimGCThreshold: 5 * time.Minute, EvalNackTimeout: 60 * time.Second, EvalDeliveryLimit: 3, EvalNackInitialReenqueueDelay: 1 * time.Second, diff --git a/nomad/core_sched.go b/nomad/core_sched.go index c2d3a1463d7d..5e47b9184cdc 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -783,7 +783,7 @@ NEXT_VOLUME: if err != nil { return err } - if alloc == nil { + if alloc == nil || alloc.TerminalStatus() { err = gcClaims(vol.Namespace, vol.ID) if err != nil { return err @@ -796,7 +796,7 @@ NEXT_VOLUME: if err != nil { return err } - if alloc == nil { + if alloc == nil || alloc.TerminalStatus() { err = gcClaims(vol.Namespace, vol.ID) if err != nil { return err diff --git a/nomad/csi_batch.go b/nomad/csi_batch.go deleted file mode 100644 index 50e36e7a6c40..000000000000 --- a/nomad/csi_batch.go +++ /dev/null @@ -1,76 +0,0 @@ -package nomad - -import ( - log "github.com/hashicorp/go-hclog" - multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/nomad/nomad/structs" -) - -// csiBatchRelease is a helper for any time we need to release a bunch -// of volume claims at once. It de-duplicates the volumes and batches -// the raft messages into manageable chunks. Intended for use by RPCs -// that have already been forwarded to the leader. -type csiBatchRelease struct { - srv *Server - logger log.Logger - - maxBatchSize int - seen map[string]struct{} - batches []*structs.CSIVolumeClaimBatchRequest -} - -func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease { - return &csiBatchRelease{ - srv: srv, - logger: logger, - maxBatchSize: max, - seen: map[string]struct{}{}, - batches: []*structs.CSIVolumeClaimBatchRequest{{}}, - } -} - -// add the volume ID + namespace to the deduplicated batches -func (c *csiBatchRelease) add(vol, namespace string) { - id := vol + "\x00" + namespace - - // ignore duplicates - _, seen := c.seen[id] - if seen { - return - } - c.seen[id] = struct{}{} - - req := structs.CSIVolumeClaimRequest{ - VolumeID: vol, - Claim: structs.CSIVolumeClaimRelease, - } - req.Namespace = namespace - req.Region = c.srv.config.Region - - for _, batch := range c.batches { - // otherwise append to the first non-full batch - if len(batch.Claims) < c.maxBatchSize { - batch.Claims = append(batch.Claims, req) - return - } - } - // no non-full batch found, make a new one - newBatch := &structs.CSIVolumeClaimBatchRequest{ - Claims: []structs.CSIVolumeClaimRequest{req}} - c.batches = append(c.batches, newBatch) -} - -// apply flushes the batches to raft -func (c *csiBatchRelease) apply() error { - var result *multierror.Error - for _, batch := range c.batches { - if len(batch.Claims) > 0 { - _, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch) - if err != nil { - c.logger.Error("csi raft apply failed", "error", err, "method", "claim") - result = multierror.Append(result, err) - } - } - } - return result.ErrorOrNil() -} diff --git a/nomad/csi_batch_test.go b/nomad/csi_batch_test.go deleted file mode 100644 index 565fef591e30..000000000000 --- a/nomad/csi_batch_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package nomad - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestCSI_Batcher(t *testing.T) { - t.Parallel() - srv, shutdown := TestServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer shutdown() - - batcher := newCSIBatchRelease(srv, nil, 5) - - batcher.add("vol0", "global") - batcher.add("vol", "0global") - batcher.add("vol1", "global") - batcher.add("vol1", "global") - batcher.add("vol2", "global") - batcher.add("vol2", "other") - batcher.add("vol3", "global") - batcher.add("vol4", "global") - batcher.add("vol5", "global") - batcher.add("vol6", "global") - - require.Len(t, batcher.batches, 2) - require.Len(t, batcher.batches[0].Claims, 5, "first batch") - require.Equal(t, batcher.batches[0].Claims[4].VolumeID, "vol2") - require.Equal(t, batcher.batches[0].Claims[4].Namespace, "other") - require.Len(t, batcher.batches[1].Claims, 4, "second batch") -} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 834ead17bba0..a6b43798a296 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -780,19 +780,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD return err } - // For a job with volumes, find its volumes before deleting the job. - // Later we'll apply this raft. - volumesToGC := newCSIBatchRelease(j.srv, j.logger, 100) - if job != nil { - for _, tg := range job.TaskGroups { - for _, vol := range tg.Volumes { - if vol.Type == structs.VolumeTypeCSI { - volumesToGC.add(vol.Source, job.Namespace) - } - } - } - } - var eval *structs.Evaluation // The job priority / type is strange for this, since it's not a high @@ -832,13 +819,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD reply.EvalCreateIndex = index reply.Index = index - // Make a raft apply to release the CSI volume claims of terminal allocs. - var result *multierror.Error - err = volumesToGC.apply() - if err != nil { - result = multierror.Append(result, err) - } - // COMPAT(1.1.0) - Remove entire conditional block // 0.12.1 introduced atomic job deregistration eval if eval != nil && args.Eval == nil { @@ -852,16 +832,15 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Commit this evaluation via Raft _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) if err != nil { - result = multierror.Append(result, err) j.logger.Error("eval create failed", "error", err, "method", "deregister") - return result.ErrorOrNil() + return err } reply.EvalCreateIndex = evalIndex reply.Index = evalIndex } - return result.ErrorOrNil() + return nil } // BatchDeregister is used to remove a set of jobs from the cluster. diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index c83e463127c6..97af9aafc7b5 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1083,10 +1083,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene now := time.Now() var evals []*structs.Evaluation - // A set of de-duplicated volumes that need their volume claims released. - // Later we'll apply this raft. - volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100) - for _, allocToUpdate := range args.Alloc { allocToUpdate.ModifyTime = now.UTC().UnixNano() @@ -1115,14 +1111,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene continue } - // If the terminal alloc has CSI volumes, add the volumes to the batch - // of volumes we'll release the claims of. - for _, vol := range taskGroup.Volumes { - if vol.Type == structs.VolumeTypeCSI { - volumesToGC.add(vol.Source, alloc.Namespace) - } - } - // Add an evaluation if this is a failed alloc that is eligible for rescheduling if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { eval := &structs.Evaluation{ @@ -1140,13 +1128,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene } } - // Make a raft apply to release the CSI volume claims of terminal allocs. - var result *multierror.Error - err := volumesToGC.apply() - if err != nil { - result = multierror.Append(result, err) - } - // Add this to the batch n.updatesLock.Lock() n.updates = append(n.updates, args.Alloc...) @@ -1177,13 +1158,12 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene // Wait for the future if err := future.Wait(); err != nil { - result = multierror.Append(result, err) - return result.ErrorOrNil() + return err } // Setup the response reply.Index = future.Index() - return result.ErrorOrNil() + return nil } // batchUpdate is used to update all the allocations diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 7b0c13b492d2..1be5728c598b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2313,132 +2313,6 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) { } } -func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { - t.Parallel() - srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) - defer shutdown() - testutil.WaitForLeader(t, srv.RPC) - - codec := rpcClient(t, srv) - state := srv.fsm.State() - - index := uint64(0) - ws := memdb.NewWatchSet() - - // Create a client node, plugin, and volume - node := mock.Node() - node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version - node.CSINodePlugins = map[string]*structs.CSIInfo{ - "csi-plugin-example": {PluginID: "csi-plugin-example", - Healthy: true, - NodeInfo: &structs.CSINodeInfo{}, - ControllerInfo: &structs.CSIControllerInfo{}, - }, - } - index++ - err := state.UpsertNode(index, node) - require.NoError(t, err) - volId0 := uuid.Generate() - ns := structs.DefaultNamespace - vols := []*structs.CSIVolume{{ - ID: volId0, - Namespace: ns, - PluginID: "csi-plugin-example", - AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, - AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - }} - index++ - err = state.CSIVolumeRegister(index, vols) - require.NoError(t, err) - vol, err := state.CSIVolumeByID(ws, ns, volId0) - require.NoError(t, err) - require.Len(t, vol.ReadAllocs, 0) - require.Len(t, vol.WriteAllocs, 0) - - // Create a job with 2 allocations - job := mock.Job() - job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ - "_": { - Name: "someVolume", - Type: structs.VolumeTypeCSI, - Source: volId0, - ReadOnly: false, - }, - } - index++ - err = state.UpsertJob(index, job) - require.NoError(t, err) - - alloc1 := mock.Alloc() - alloc1.JobID = job.ID - alloc1.NodeID = node.ID - index++ - err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID)) - require.NoError(t, err) - alloc1.TaskGroup = job.TaskGroups[0].Name - - alloc2 := mock.Alloc() - alloc2.JobID = job.ID - alloc2.NodeID = node.ID - index++ - err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID)) - require.NoError(t, err) - alloc2.TaskGroup = job.TaskGroups[0].Name - - index++ - err = state.UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2}) - require.NoError(t, err) - - // Claim the volumes and verify the claims were set. We need to - // apply this through the FSM so that we make sure the index is - // properly updated to test later - batch := &structs.CSIVolumeClaimBatchRequest{ - Claims: []structs.CSIVolumeClaimRequest{ - { - VolumeID: volId0, - AllocationID: alloc1.ID, - NodeID: alloc1.NodeID, - Claim: structs.CSIVolumeClaimWrite, - }, - { - VolumeID: volId0, - AllocationID: alloc2.ID, - NodeID: alloc2.NodeID, - Claim: structs.CSIVolumeClaimRead, - }, - }} - _, lastIndex, err := srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch) - require.NoError(t, err) - - vol, err = state.CSIVolumeByID(ws, ns, volId0) - require.NoError(t, err) - require.Len(t, vol.ReadAllocs, 1) - require.Len(t, vol.WriteAllocs, 1) - - // Update the 1st alloc as terminal/failed - alloc1.ClientStatus = structs.AllocClientStatusFailed - err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", - &structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc1}, - WriteRequest: structs.WriteRequest{Region: "global"}, - }, &structs.NodeAllocsResponse{}) - require.NoError(t, err) - - // Lookup the alloc and verify status was updated - out, err := state.AllocByID(ws, alloc1.ID) - require.NoError(t, err) - require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus) - - // Verify the index has been updated to trigger a volume claim release - - req := &structs.CSIVolumeGetRequest{ID: volId0} - req.Region = "global" - getResp := &structs.CSIVolumeGetResponse{} - err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, getResp) - require.NoError(t, err) - require.Greater(t, getResp.Volume.ModifyIndex, lastIndex) -} - func TestClientEndpoint_CreateNodeEvals(t *testing.T) { t.Parallel() diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index cf4fc06fb20e..ca413a9ff6f4 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -168,10 +168,6 @@ func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool { func (vw *volumeWatcher) volumeReapImpl(vol *structs.CSIVolume) error { - if len(vol.PastClaims) == 0 { - return nil - } - // PastClaims written by a volume GC core job will have no allocation, // so we need to find out which allocs are eligible for cleanup. for _, claim := range vol.PastClaims {