Skip to content

Commit

Permalink
csi: remove async volume GC from node update
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Jul 31, 2020
1 parent a250c3d commit 0356e51
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 148 deletions.
24 changes: 2 additions & 22 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,10 +1083,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
now := time.Now()
var evals []*structs.Evaluation

// A set of de-duplicated volumes that need their volume claims released.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100)

for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()

Expand Down Expand Up @@ -1115,14 +1111,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}

// If the terminal alloc has CSI volumes, add the volumes to the batch
// of volumes we'll release the claims of.
for _, vol := range taskGroup.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC.add(vol.Source, alloc.Namespace)
}
}

// Add an evaluation if this is a failed alloc that is eligible for rescheduling
if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
eval := &structs.Evaluation{
Expand All @@ -1140,13 +1128,6 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
}
}

// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
err := volumesToGC.apply()
if err != nil {
result = multierror.Append(result, err)
}

// Add this to the batch
n.updatesLock.Lock()
n.updates = append(n.updates, args.Alloc...)
Expand Down Expand Up @@ -1177,13 +1158,12 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene

// Wait for the future
if err := future.Wait(); err != nil {
result = multierror.Append(result, err)
return result.ErrorOrNil()
return err
}

// Setup the response
reply.Index = future.Index()
return result.ErrorOrNil()
return nil
}

// batchUpdate is used to update all the allocations
Expand Down
126 changes: 0 additions & 126 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2313,132 +2313,6 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 })
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)

codec := rpcClient(t, srv)
state := srv.fsm.State()

index := uint64(0)
ws := memdb.NewWatchSet()

// Create a client node, plugin, and volume
node := mock.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version
node.CSINodePlugins = map[string]*structs.CSIInfo{
"csi-plugin-example": {PluginID: "csi-plugin-example",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
ControllerInfo: &structs.CSIControllerInfo{},
},
}
index++
err := state.UpsertNode(index, node)
require.NoError(t, err)
volId0 := uuid.Generate()
ns := structs.DefaultNamespace
vols := []*structs.CSIVolume{{
ID: volId0,
Namespace: ns,
PluginID: "csi-plugin-example",
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}
index++
err = state.CSIVolumeRegister(index, vols)
require.NoError(t, err)
vol, err := state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 0)
require.Len(t, vol.WriteAllocs, 0)

// Create a job with 2 allocations
job := mock.Job()
job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"_": {
Name: "someVolume",
Type: structs.VolumeTypeCSI,
Source: volId0,
ReadOnly: false,
},
}
index++
err = state.UpsertJob(index, job)
require.NoError(t, err)

alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.NodeID = node.ID
index++
err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID))
require.NoError(t, err)
alloc1.TaskGroup = job.TaskGroups[0].Name

alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.NodeID = node.ID
index++
err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID))
require.NoError(t, err)
alloc2.TaskGroup = job.TaskGroups[0].Name

index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)

// Claim the volumes and verify the claims were set. We need to
// apply this through the FSM so that we make sure the index is
// properly updated to test later
batch := &structs.CSIVolumeClaimBatchRequest{
Claims: []structs.CSIVolumeClaimRequest{
{
VolumeID: volId0,
AllocationID: alloc1.ID,
NodeID: alloc1.NodeID,
Claim: structs.CSIVolumeClaimWrite,
},
{
VolumeID: volId0,
AllocationID: alloc2.ID,
NodeID: alloc2.NodeID,
Claim: structs.CSIVolumeClaimRead,
},
}}
_, lastIndex, err := srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
require.NoError(t, err)

vol, err = state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
require.Len(t, vol.WriteAllocs, 1)

// Update the 1st alloc as terminal/failed
alloc1.ClientStatus = structs.AllocClientStatusFailed
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc",
&structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc1},
WriteRequest: structs.WriteRequest{Region: "global"},
}, &structs.NodeAllocsResponse{})
require.NoError(t, err)

// Lookup the alloc and verify status was updated
out, err := state.AllocByID(ws, alloc1.ID)
require.NoError(t, err)
require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus)

// Verify the index has been updated to trigger a volume claim release

req := &structs.CSIVolumeGetRequest{ID: volId0}
req.Region = "global"
getResp := &structs.CSIVolumeGetResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, getResp)
require.NoError(t, err)
require.Greater(t, getResp.Volume.ModifyIndex, lastIndex)
}

func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 0356e51

Please sign in to comment.