Skip to content

Commit

Permalink
csi: don't pass volume claim releases thru GC eval
Browse files Browse the repository at this point in the history
Following the new volumewatcher in #7794 and performance improvements
to it that landed afterwards, there's no particular reason we should
be threading claim releases through the GC eval rather than writing an
empty `CSIVolumeClaimRequest` with the mode set to
`CSIVolumeClaimRelease`, just as the GC evaluation would do.

Also, by batching up these raft messages, we can reduce the amount of
raft writes by 1 and cross-server RPCs by 1 per volume we release
claims on.
  • Loading branch information
tgross committed May 20, 2020
1 parent 36684bd commit a012a56
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 108 deletions.
73 changes: 73 additions & 0 deletions nomad/csi_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package nomad

import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)

// csiBatchRelease is a helper for any time we need to release a bunch
// of volume claims at once. It de-duplicates the volumes and batches
// the raft messages into manageable chunks. Intended for use by RPCs
// that have already been forwarded to the leader.
type csiBatchRelease struct {
srv *Server
logger log.Logger

maxBatchSize int
seen map[string]struct{}
batches []*structs.CSIVolumeClaimBatchRequest
}

func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease {
return &csiBatchRelease{
srv: srv,
logger: logger,
maxBatchSize: max,
seen: map[string]struct{}{},
batches: []*structs.CSIVolumeClaimBatchRequest{{}},
}
}

// add the volume ID + namespace to the deduplicated batches
func (c *csiBatchRelease) add(vol, namespace string) {
id := vol + namespace

// ignore duplicates
_, seen := c.seen[id]
if seen {
return
}

req := structs.CSIVolumeClaimRequest{
VolumeID: vol,
Claim: structs.CSIVolumeClaimRelease,
}
req.Namespace = namespace
req.Region = c.srv.config.Region

for _, batch := range c.batches {
// otherwise append to the first non-full batch
if len(batch.Claims) < c.maxBatchSize {
batch.Claims = append(batch.Claims, req)
return
}
}
// no non-full batch found, make a new one
newBatch := &structs.CSIVolumeClaimBatchRequest{
Claims: []structs.CSIVolumeClaimRequest{req}}
c.batches = append(c.batches, newBatch)
}

// apply flushes the batches to raft
func (c *csiBatchRelease) apply() error {
for _, batch := range c.batches {
if len(batch.Claims) > 0 {
_, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
if err != nil {
c.logger.Error("csi raft apply failed", "error", err, "method", "claim")
return err
}
}
}
return nil
}
98 changes: 38 additions & 60 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,19 +707,20 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}

// For a job with volumes, find its volumes before deleting the job
volumesToGC := make(map[string]*structs.VolumeRequest)
// For a job with volumes, find its volumes before deleting the job.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(j.srv, j.logger, 100)
if job != nil {
for _, tg := range job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC[vol.Source] = vol
volumesToGC.add(vol.Source, job.Namespace)
}
}
}
}

// Commit this update via Raft
// Commit the job update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
j.logger.Error("deregister failed", "error", err)
Expand All @@ -729,69 +730,46 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// Populate the reply with job information
reply.JobModifyIndex = index

evals := []*structs.Evaluation{}
now := time.Now().UTC().UnixNano()
// Make a raft apply to release the CSI volume claims of terminal allocs.
volumesToGC.apply()

// Add an evaluation for garbage collecting the the CSI volume claims
// of terminal allocs
for _, vol := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the volume's namespace
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source,
LeaderACL: j.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
return nil
}

// If the job is periodic or parameterized, we don't create an eval
// for the job, but might still need one for the volumes
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

if len(evals) > 0 {
update := &structs.EvalUpdateRequest{
Evals: evals,
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
}

// Populate the reply with eval information
reply.EvalID = evals[len(evals)-1].ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return nil
}

Expand Down
33 changes: 8 additions & 25 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,9 +1081,9 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
now := time.Now()
var evals []*structs.Evaluation

// A set of de-duplicated volumes that need volume claim GC.
// Later we'll create a gc eval for each volume.
volumesToGC := make(map[string][]string) // ID+namespace -> [id, namespace]
// A set of de-duplicated volumes that need their volume claims released.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100)

for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()
Expand Down Expand Up @@ -1113,11 +1113,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}

// If the terminal alloc has CSI volumes, add its job to the list
// of jobs we're going to call volume claim GC on.
// If the terminal alloc has CSI volumes, add the volumes to the batch
// of volumes we'll release the claims of.
for _, vol := range taskGroup.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC[vol.Source+alloc.Namespace] = []string{vol.Source, alloc.Namespace}
volumesToGC.add(vol.Source, alloc.Namespace)
}
}

Expand All @@ -1138,25 +1138,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
}
}

// Add an evaluation for garbage collecting the the CSI volume claims
// of terminal allocs
for _, volAndNamespace := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the volume's namespace
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: volAndNamespace[1],
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0],
LeaderACL: n.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
}
// Make a raft apply to release the CSI volume claims of terminal allocs.
volumesToGC.apply()

// Add this to the batch
n.updatesLock.Lock()
Expand Down
65 changes: 42 additions & 23 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2321,6 +2321,8 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {

codec := rpcClient(t, srv)
state := srv.fsm.State()

index := uint64(0)
ws := memdb.NewWatchSet()

// Create a client node, plugin, and volume
Expand All @@ -2333,7 +2335,8 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
ControllerInfo: &structs.CSIControllerInfo{},
},
}
err := state.UpsertNode(99, node)
index++
err := state.UpsertNode(index, node)
require.NoError(t, err)
volId0 := uuid.Generate()
ns := structs.DefaultNamespace
Expand All @@ -2344,7 +2347,8 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}
err = state.CSIVolumeRegister(100, vols)
index++
err = state.CSIVolumeRegister(index, vols)
require.NoError(t, err)
vol, err := state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
Expand All @@ -2361,39 +2365,51 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
ReadOnly: false,
},
}
err = state.UpsertJob(101, job)
index++
err = state.UpsertJob(index, job)
require.NoError(t, err)

alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.NodeID = node.ID
err = state.UpsertJobSummary(102, mock.JobSummary(alloc1.JobID))
index++
err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID))
require.NoError(t, err)
alloc1.TaskGroup = job.TaskGroups[0].Name

alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.NodeID = node.ID
err = state.UpsertJobSummary(103, mock.JobSummary(alloc2.JobID))
index++
err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID))
require.NoError(t, err)
alloc2.TaskGroup = job.TaskGroups[0].Name

err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2})
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)

// Claim the volumes and verify the claims were set
err = state.CSIVolumeClaim(105, ns, volId0, &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: alloc1.NodeID,
Mode: structs.CSIVolumeClaimWrite,
})
require.NoError(t, err)
err = state.CSIVolumeClaim(106, ns, volId0, &structs.CSIVolumeClaim{
AllocationID: alloc2.ID,
NodeID: alloc2.NodeID,
Mode: structs.CSIVolumeClaimRead,
})
// Claim the volumes and verify the claims were set. We need to
// apply this through the FSM so that we make sure the index is
// properly updated to test later
batch := &structs.CSIVolumeClaimBatchRequest{
Claims: []structs.CSIVolumeClaimRequest{
{
VolumeID: volId0,
AllocationID: alloc1.ID,
NodeID: alloc1.NodeID,
Claim: structs.CSIVolumeClaimWrite,
},
{
VolumeID: volId0,
AllocationID: alloc2.ID,
NodeID: alloc2.NodeID,
Claim: structs.CSIVolumeClaimRead,
},
}}
_, lastIndex, err := srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
require.NoError(t, err)

vol, err = state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
Expand All @@ -2413,11 +2429,14 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
require.NoError(t, err)
require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus)

// Verify the eval for the claim GC was emitted
// Lookup the evaluations
eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0)
require.NotNil(t, eval)
require.Nil(t, err)
// Verify the index has been updated to trigger a volume claim release

req := &structs.CSIVolumeGetRequest{ID: volId0}
req.Region = "global"
getResp := &structs.CSIVolumeGetResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, getResp)
require.NoError(t, err)
require.Greater(t, getResp.Volume.ModifyIndex, lastIndex)
}

func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
Expand Down

0 comments on commit a012a56

Please sign in to comment.