Skip to content

Commit

Permalink
scheduler: retain eval metrics on port collision (hashicorp#19933)
Browse files Browse the repository at this point in the history
When an allocation can't be placed because of a port collision the
resulting blocked eval is expected to have a metric reporting the port
that caused the conflict, but this metrics was not being emitted when
preemption was enabled.
  • Loading branch information
lgfa29 authored and nvanthao committed Mar 1, 2024
1 parent 003b687 commit bdc69f8
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/19933.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug that caused blocked evaluations due to port conflict to not have a reason explaining why the evaluation was blocked
```
84 changes: 84 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2489,6 +2489,90 @@ func TestJobRegister_ACL_RejectedBySchedulerConfig(t *testing.T) {
}
}

func TestJobEndpoint_Register_PortCollistion(t *testing.T) {
ci.Parallel(t)

testCases := []struct {
name string
configFn func(c *Config)
}{
{
name: "no preemption",
configFn: func(c *Config) {
c.DefaultSchedulerConfig = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
ServiceSchedulerEnabled: false,
},
}
},
},
{
name: "with preemption",
configFn: func(c *Config) {
c.DefaultSchedulerConfig = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
ServiceSchedulerEnabled: true,
},
}
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s1, cleanupS1 := TestServer(t, tc.configFn)
defer cleanupS1()
state := s1.fsm.State()

// Create test node.
node := mock.Node()
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node))

// Create test job with a static port.
job := mock.Job()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Networks[0].DynamicPorts = nil
job.TaskGroups[0].Networks[0].ReservedPorts = []structs.Port{
{Label: "http", Value: 80},
}
job.TaskGroups[0].Tasks[0].Services = nil

testutil.RegisterJob(t, s1.RPC, job)
testutil.WaitForJobAllocStatus(t, s1.RPC, job, map[string]int{
structs.AllocClientStatusPending: 1,
})

// Register second job with port conflict.
job2 := job.Copy()
job2.ID = fmt.Sprintf("conflict-%s", uuid.Generate())
job2.Name = job2.ID

testutil.RegisterJob(t, s1.RPC, job2)

// Wait for job registration eval to complete.
evals := testutil.WaitForJobEvalStatus(t, s1.RPC, job2, map[string]int{
structs.EvalStatusComplete: 1,
structs.EvalStatusBlocked: 1,
})

var blockedEval *structs.Evaluation
for _, e := range evals {
if e.Status == structs.EvalStatusBlocked {
blockedEval = e
break
}
}

// Ensure blocked eval is properly annotated.
must.MapLen(t, 1, blockedEval.FailedTGAllocs)
must.NotNil(t, blockedEval.FailedTGAllocs["web"])
must.Eq(t, map[string]int{
"network: reserved port collision http=80": 1,
}, blockedEval.FailedTGAllocs["web"].DimensionExhausted)
})
}
}

func TestJobEndpoint_Revert(t *testing.T) {
ci.Parallel(t)

Expand Down
8 changes: 8 additions & 0 deletions scheduler/rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ OUTER:
netPreemptions := preemptor.PreemptForNetwork(ask, netIdx)
if netPreemptions == nil {
iter.ctx.Logger().Named("binpack").Debug("preemption not possible ", "network_resource", ask)
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
netIdx.Release()
continue OUTER
}
Expand All @@ -339,6 +341,8 @@ OUTER:
offer, err = netIdx.AssignPorts(ask)
if err != nil {
iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err)
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
netIdx.Release()
continue OUTER
}
Expand Down Expand Up @@ -392,6 +396,8 @@ OUTER:
netPreemptions := preemptor.PreemptForNetwork(ask, netIdx)
if netPreemptions == nil {
iter.ctx.Logger().Named("binpack").Debug("preemption not possible ", "network_resource", ask)
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
netIdx.Release()
continue OUTER
}
Expand All @@ -409,6 +415,8 @@ OUTER:
offer, err = netIdx.AssignTaskNetwork(ask)
if offer == nil {
iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err)
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
netIdx.Release()
continue OUTER
}
Expand Down
51 changes: 51 additions & 0 deletions testutil/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,57 @@ func WaitForJobAllocStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, a
return allocs
}

// WaitforJobEvalStatus blocks until the job's evals match the status described
// in the map of <Eval.Status>: <count>.
func WaitForJobEvalStatus(t testing.TB, rpc rpcFn, job *structs.Job, evalStatus map[string]int) []*structs.Evaluation {
return WaitForJobEvalStatusWithToken(t, rpc, job, evalStatus, "")
}

// WaitForJobEvalStatusWithToken is the same as WaitforJobEvalStatus with ACL
// enabled.
func WaitForJobEvalStatusWithToken(t testing.TB, rpc rpcFn, job *structs.Job, evalStatus map[string]int, token string) []*structs.Evaluation {
var evals []*structs.Evaluation

errorFunc := func() error {
req := &structs.JobSpecificRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{
AuthToken: token,
Namespace: job.Namespace,
Region: job.Region,
},
}
var resp structs.JobEvaluationsResponse
err := rpc("Job.Evaluations", req, &resp)
if err != nil {
return fmt.Errorf("failed to call Job.Evaluations RPC: %w", err)
}

got := make(map[string]int)
for _, eval := range resp.Evaluations {
got[eval.Status]++
}

if diff := cmp.Diff(evalStatus, got); diff != "" {
return fmt.Errorf("eval status mismatch (-want +got):\n%s", diff)
}

evals = resp.Evaluations
return nil
}

must.Wait(t,
wait.InitialSuccess(
wait.ErrorFunc(errorFunc),
wait.Timeout(time.Duration(TestMultiplier())*time.Second),
wait.Gap(10*time.Millisecond),
),
must.Sprintf("failed to wait for job %s eval status", job.ID),
)

return evals
}

// WaitForFiles blocks until all the files in the slice are present
func WaitForFiles(t testing.TB, files []string) {
WaitForResult(func() (bool, error) {
Expand Down

0 comments on commit bdc69f8

Please sign in to comment.