From ad7cbca1bc06cbfad559fa833b9fe0623a905a0d Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 29 Jul 2020 16:31:18 -0400 Subject: [PATCH] csi: remove async volume GC from node update --- nomad/node_endpoint.go | 24 +------ nomad/node_endpoint_test.go | 126 ------------------------------------ 2 files changed, 2 insertions(+), 148 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index c83e463127c6..97af9aafc7b5 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1083,10 +1083,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene now := time.Now() var evals []*structs.Evaluation - // A set of de-duplicated volumes that need their volume claims released. - // Later we'll apply this raft. - volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100) - for _, allocToUpdate := range args.Alloc { allocToUpdate.ModifyTime = now.UTC().UnixNano() @@ -1115,14 +1111,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene continue } - // If the terminal alloc has CSI volumes, add the volumes to the batch - // of volumes we'll release the claims of. - for _, vol := range taskGroup.Volumes { - if vol.Type == structs.VolumeTypeCSI { - volumesToGC.add(vol.Source, alloc.Namespace) - } - } - // Add an evaluation if this is a failed alloc that is eligible for rescheduling if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { eval := &structs.Evaluation{ @@ -1140,13 +1128,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene } } - // Make a raft apply to release the CSI volume claims of terminal allocs. - var result *multierror.Error - err := volumesToGC.apply() - if err != nil { - result = multierror.Append(result, err) - } - // Add this to the batch n.updatesLock.Lock() n.updates = append(n.updates, args.Alloc...) @@ -1177,13 +1158,12 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene // Wait for the future if err := future.Wait(); err != nil { - result = multierror.Append(result, err) - return result.ErrorOrNil() + return err } // Setup the response reply.Index = future.Index() - return result.ErrorOrNil() + return nil } // batchUpdate is used to update all the allocations diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 7b0c13b492d2..1be5728c598b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2313,132 +2313,6 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) { } } -func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { - t.Parallel() - srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) - defer shutdown() - testutil.WaitForLeader(t, srv.RPC) - - codec := rpcClient(t, srv) - state := srv.fsm.State() - - index := uint64(0) - ws := memdb.NewWatchSet() - - // Create a client node, plugin, and volume - node := mock.Node() - node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version - node.CSINodePlugins = map[string]*structs.CSIInfo{ - "csi-plugin-example": {PluginID: "csi-plugin-example", - Healthy: true, - NodeInfo: &structs.CSINodeInfo{}, - ControllerInfo: &structs.CSIControllerInfo{}, - }, - } - index++ - err := state.UpsertNode(index, node) - require.NoError(t, err) - volId0 := uuid.Generate() - ns := structs.DefaultNamespace - vols := []*structs.CSIVolume{{ - ID: volId0, - Namespace: ns, - PluginID: "csi-plugin-example", - AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, - AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - }} - index++ - err = state.CSIVolumeRegister(index, vols) - require.NoError(t, err) - vol, err := state.CSIVolumeByID(ws, ns, volId0) - require.NoError(t, err) - require.Len(t, vol.ReadAllocs, 0) - require.Len(t, vol.WriteAllocs, 0) - - // Create a job with 2 allocations - job := mock.Job() - job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ - "_": { - Name: "someVolume", - Type: structs.VolumeTypeCSI, - Source: volId0, - ReadOnly: false, - }, - } - index++ - err = state.UpsertJob(index, job) - require.NoError(t, err) - - alloc1 := mock.Alloc() - alloc1.JobID = job.ID - alloc1.NodeID = node.ID - index++ - err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID)) - require.NoError(t, err) - alloc1.TaskGroup = job.TaskGroups[0].Name - - alloc2 := mock.Alloc() - alloc2.JobID = job.ID - alloc2.NodeID = node.ID - index++ - err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID)) - require.NoError(t, err) - alloc2.TaskGroup = job.TaskGroups[0].Name - - index++ - err = state.UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2}) - require.NoError(t, err) - - // Claim the volumes and verify the claims were set. We need to - // apply this through the FSM so that we make sure the index is - // properly updated to test later - batch := &structs.CSIVolumeClaimBatchRequest{ - Claims: []structs.CSIVolumeClaimRequest{ - { - VolumeID: volId0, - AllocationID: alloc1.ID, - NodeID: alloc1.NodeID, - Claim: structs.CSIVolumeClaimWrite, - }, - { - VolumeID: volId0, - AllocationID: alloc2.ID, - NodeID: alloc2.NodeID, - Claim: structs.CSIVolumeClaimRead, - }, - }} - _, lastIndex, err := srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch) - require.NoError(t, err) - - vol, err = state.CSIVolumeByID(ws, ns, volId0) - require.NoError(t, err) - require.Len(t, vol.ReadAllocs, 1) - require.Len(t, vol.WriteAllocs, 1) - - // Update the 1st alloc as terminal/failed - alloc1.ClientStatus = structs.AllocClientStatusFailed - err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", - &structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc1}, - WriteRequest: structs.WriteRequest{Region: "global"}, - }, &structs.NodeAllocsResponse{}) - require.NoError(t, err) - - // Lookup the alloc and verify status was updated - out, err := state.AllocByID(ws, alloc1.ID) - require.NoError(t, err) - require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus) - - // Verify the index has been updated to trigger a volume claim release - - req := &structs.CSIVolumeGetRequest{ID: volId0} - req.Region = "global" - getResp := &structs.CSIVolumeGetResponse{} - err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, getResp) - require.NoError(t, err) - require.Greater(t, getResp.Volume.ModifyIndex, lastIndex) -} - func TestClientEndpoint_CreateNodeEvals(t *testing.T) { t.Parallel()