Skip to content

Commit

Permalink
CSI: no early return when feasibility check fails on eligible nodes (#…
Browse files Browse the repository at this point in the history
…13274)

As a performance optimization in the scheduler, feasibility checks
that apply to an entire class are only checked once for all nodes of
that class. Other feasibility checks are "available" checks because
they rely on more ephemeral characteristics and don't contribute to
the hash for the node class. This currently includes only CSI.

We have a separate fast path for "available" checks when the node has
already been marked eligible on the basis of class. This fast path has
a bug where it returns early rather than continuing the loop. This
causes the entire task group to be rejected.

Fix the bug by not returning early in the fast path and instead jump
to the top of the loop like all the other code paths in this method.
Includes a new test exercising topology at whole-scheduler level and a
fix for an existing test that should've caught this previously.
  • Loading branch information
tgross authored and tbehling committed Jun 29, 2022
1 parent 911b8bb commit e1bf8c3
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .changelog/13274.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a scheduler bug where failed feasibility checks would return early and prevent processing additional nodes
```
5 changes: 2 additions & 3 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,9 +1135,8 @@ OUTER:
if w.available(option) {
return option
}
// We match the class but are temporarily unavailable, the eval
// should be blocked
return nil
// We match the class but are temporarily unavailable
continue OUTER
case EvalComputedClassEscaped:
tgEscaped = true
case EvalComputedClassUnknown:
Expand Down
89 changes: 88 additions & 1 deletion scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6353,7 +6353,7 @@ func TestServiceSched_CSIVolumesPerAlloc(t *testing.T) {
require.NotEqual("", h.Evals[1].BlockedEval,
"expected a blocked eval to be spawned")
require.Equal(2, h.Evals[1].QueuedAllocations["web"], "expected 2 queued allocs")
require.Equal(1, h.Evals[1].FailedTGAllocs["web"].
require.Equal(5, h.Evals[1].FailedTGAllocs["web"].
ConstraintFiltered["missing CSI Volume volume-unique[3]"])

// Upsert 2 more per-alloc volumes
Expand Down Expand Up @@ -6397,6 +6397,93 @@ func TestServiceSched_CSIVolumesPerAlloc(t *testing.T) {

}

func TestServiceSched_CSITopology(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)

zones := []string{"zone-0", "zone-1", "zone-2", "zone-3"}

// Create some nodes, each running a CSI plugin with topology for
// a different "zone"
for i := 0; i < 12; i++ {
node := mock.Node()
node.Datacenter = zones[i%4]
node.CSINodePlugins = map[string]*structs.CSIInfo{
"test-plugin-" + zones[i%4]: {
PluginID: "test-plugin-" + zones[i%4],
Healthy: true,
NodeInfo: &structs.CSINodeInfo{
MaxVolumes: 3,
AccessibleTopology: &structs.CSITopology{
Segments: map[string]string{"zone": zones[i%4]}},
},
},
}
require.NoError(t, h.State.UpsertNode(
structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// create 2 per-alloc volumes for those zones
vol0 := structs.NewCSIVolume("myvolume[0]", 0)
vol0.PluginID = "test-plugin-zone-0"
vol0.Namespace = structs.DefaultNamespace
vol0.AccessMode = structs.CSIVolumeAccessModeSingleNodeWriter
vol0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
vol0.RequestedTopologies = &structs.CSITopologyRequest{
Required: []*structs.CSITopology{{
Segments: map[string]string{"zone": "zone-0"},
}},
}

vol1 := vol0.Copy()
vol1.ID = "myvolume[1]"
vol1.PluginID = "test-plugin-zone-1"
vol1.RequestedTopologies.Required[0].Segments["zone"] = "zone-1"

require.NoError(t, h.State.UpsertCSIVolume(
h.NextIndex(), []*structs.CSIVolume{vol0, vol1}))

// Create a job that uses those volumes
job := mock.Job()
job.Datacenters = zones
job.TaskGroups[0].Count = 2
job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"myvolume": {
Type: "csi",
Name: "unique",
Source: "myvolume",
PerAlloc: true,
},
}

require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))

// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}

require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation and expect a single plan without annotations
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
require.Len(t, h.Plans, 1, "expected one plan")
require.Nil(t, h.Plans[0].Annotations, "expected no annotations")

// Expect the eval has not spawned a blocked eval
require.Equal(t, len(h.CreateEvals), 0)
require.Equal(t, "", h.Evals[0].BlockedEval, "did not expect a blocked eval")
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)

}

// TestPropagateTaskState asserts that propagateTaskState only copies state
// when the previous allocation is lost or draining.
func TestPropagateTaskState(t *testing.T) {
Expand Down

0 comments on commit e1bf8c3

Please sign in to comment.