Skip to content

Commit

Permalink
backport of commit d76273d
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Feb 10, 2023
1 parent a38eb37 commit 2c487fa
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .changelog/16112.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
eval broker: Fixed a bug where the cancelable eval reaper used an incorrect lock when getting the set of cancelable evals from the broker
```
4 changes: 2 additions & 2 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,8 @@ func (b *EvalBroker) Stats() *BrokerStats {
// stale and ready to mark for canceling. The eval RPC will call this with a
// batch size set to avoid sending overly large raft messages.
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
b.l.RLock()
defer b.l.RUnlock()
b.l.Lock()
defer b.l.Unlock()

if batchSize > len(b.cancelable) {
batchSize = len(b.cancelable)
Expand Down
55 changes: 46 additions & 9 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

var (
Expand Down Expand Up @@ -1475,6 +1477,31 @@ func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) {
must.Eq(t, 100, eval.ModifyIndex)
}

func TestEvalBroker_Cancelable(t *testing.T) {
ci.Parallel(t)

b := testBroker(t, time.Minute)

evals := []*structs.Evaluation{}
for i := 0; i < 20; i++ {
eval := mock.Eval()
evals = append(evals, eval)
}
b.cancelable = evals
b.stats.TotalCancelable = len(b.cancelable)

must.Len(t, 20, b.cancelable)
cancelable := b.Cancelable(10)
must.Len(t, 10, cancelable)
must.Len(t, 10, b.cancelable)
must.Eq(t, 10, b.stats.TotalCancelable)

cancelable = b.Cancelable(20)
must.Len(t, 10, cancelable)
must.Len(t, 0, b.cancelable)
must.Eq(t, 0, b.stats.TotalCancelable)
}

// TestEvalBroker_IntegrationTest exercises the eval broker with realistic
// workflows
func TestEvalBroker_IntegrationTest(t *testing.T) {
Expand Down Expand Up @@ -1567,16 +1594,26 @@ func TestEvalBroker_IntegrationTest(t *testing.T) {

config := DefaultConfig()
config.NumSchedulers = 4
config.EvalReapCancelableInterval = time.Minute * 10
require.NoError(t, srv.Reload(config))
must.NoError(t, srv.Reload(config))

// assert that all but 2 evals were canceled and that the eval broker state
// has been cleared

require.Eventually(t, func() bool {
got := getEvalStatuses()
return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9
}, 2*time.Second, time.Millisecond*100)
var got map[string]int

must.Wait(t, wait.InitialSuccess(
wait.Timeout(5*time.Second),
wait.Gap(100*time.Millisecond),
wait.BoolFunc(func() bool {
got = getEvalStatuses()
return got[structs.EvalStatusComplete] == 2 &&
got[structs.EvalStatusCancelled] == 9
}),
),
must.Func(func() string {
return fmt.Sprintf("expected map[complete:2 canceled:9] within timeout, got: %v with broker status=%#v", got, getStats())
}),
)

must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0,
TotalBlocked: 0, TotalCancelable: 0}, getStats())
Expand Down
4 changes: 3 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,8 @@ func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} {
return wakeCh
}

const cancelableEvalsBatchSize = 728 // structs.MaxUUIDsPerWriteRequest / 10

// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval
// broker and updates their status to canceled.
func cancelCancelableEvals(srv *Server) error {
Expand All @@ -1032,7 +1034,7 @@ func cancelCancelableEvals(srv *Server) error {
// We *can* send larger raft logs but rough benchmarks show that a smaller
// page size strikes a balance between throughput and time we block the FSM
// apply for other operations
cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10)
cancelable := srv.evalBroker.Cancelable(cancelableEvalsBatchSize)
if len(cancelable) > 0 {
for i, eval := range cancelable {
eval = eval.Copy()
Expand Down

0 comments on commit 2c487fa

Please sign in to comment.