From 1eabc36e3a382845e03159c33e3793910bf6f7ec Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 10 Feb 2023 10:40:41 -0500 Subject: [PATCH] eval broker: use write lock when reaping cancelable evals (#16112) The eval broker's `Cancelable` method used by the cancelable eval reaper mutates the slice of cancelable evals by removing a batch at a time from the slice. But this method unsafely uses a read lock despite this mutation. Under normal workloads this is likely to be safe but when the eval broker is under the heavy load this feature is intended to fix, we're likely to have a race condition. Switch this to a write lock, like the other locks that mutate the eval broker state. This changeset also adjusts the timeout to allow poorly-sized Actions runners more time to schedule the appropriate goroutines. The test has also been updated to use `shoenig/test/wait` so we can have sensible reporting of the results rather than just a timeout error when things go wrong. --- .changelog/16112.txt | 3 +++ nomad/eval_broker.go | 4 +-- nomad/eval_broker_test.go | 55 ++++++++++++++++++++++++++++++++------- nomad/leader.go | 4 ++- 4 files changed, 54 insertions(+), 12 deletions(-) create mode 100644 .changelog/16112.txt diff --git a/.changelog/16112.txt b/.changelog/16112.txt new file mode 100644 index 000000000000..c87e506483d2 --- /dev/null +++ b/.changelog/16112.txt @@ -0,0 +1,3 @@ +```release-note:bug +eval broker: Fixed a bug where the cancelable eval reaper used an incorrect lock when getting the set of cancelable evals from the broker +``` diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 0bfcdd9804b4..ca7907a1faa2 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -868,8 +868,8 @@ func (b *EvalBroker) Stats() *BrokerStats { // stale and ready to mark for canceling. The eval RPC will call this with a // batch size set to avoid sending overly large raft messages. func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { - b.l.RLock() - defer b.l.RUnlock() + b.l.Lock() + defer b.l.Unlock() if batchSize > len(b.cancelable) { batchSize = len(b.cancelable) diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 897eee379f9c..2b93e498dd24 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -9,14 +9,16 @@ import ( "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/shoenig/test" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/shoenig/test" - "github.com/shoenig/test/must" - "github.com/stretchr/testify/require" ) var ( @@ -1475,6 +1477,31 @@ func TestEvalBroker_PendingEvals_MarkForCancel(t *testing.T) { must.Eq(t, 100, eval.ModifyIndex) } +func TestEvalBroker_Cancelable(t *testing.T) { + ci.Parallel(t) + + b := testBroker(t, time.Minute) + + evals := []*structs.Evaluation{} + for i := 0; i < 20; i++ { + eval := mock.Eval() + evals = append(evals, eval) + } + b.cancelable = evals + b.stats.TotalCancelable = len(b.cancelable) + + must.Len(t, 20, b.cancelable) + cancelable := b.Cancelable(10) + must.Len(t, 10, cancelable) + must.Len(t, 10, b.cancelable) + must.Eq(t, 10, b.stats.TotalCancelable) + + cancelable = b.Cancelable(20) + must.Len(t, 10, cancelable) + must.Len(t, 0, b.cancelable) + must.Eq(t, 0, b.stats.TotalCancelable) +} + // TestEvalBroker_IntegrationTest exercises the eval broker with realistic // workflows func TestEvalBroker_IntegrationTest(t *testing.T) { @@ -1567,16 +1594,26 @@ func TestEvalBroker_IntegrationTest(t *testing.T) { config := DefaultConfig() config.NumSchedulers = 4 - config.EvalReapCancelableInterval = time.Minute * 10 - require.NoError(t, srv.Reload(config)) + must.NoError(t, srv.Reload(config)) // assert that all but 2 evals were canceled and that the eval broker state // has been cleared - require.Eventually(t, func() bool { - got := getEvalStatuses() - return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9 - }, 2*time.Second, time.Millisecond*100) + var got map[string]int + + must.Wait(t, wait.InitialSuccess( + wait.Timeout(5*time.Second), + wait.Gap(100*time.Millisecond), + wait.BoolFunc(func() bool { + got = getEvalStatuses() + return got[structs.EvalStatusComplete] == 2 && + got[structs.EvalStatusCancelled] == 9 + }), + ), + must.Func(func() string { + return fmt.Sprintf("expected map[complete:2 canceled:9] within timeout, got: %v with broker status=%#v", got, getStats()) + }), + ) must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, TotalPending: 0, TotalCancelable: 0}, getStats()) diff --git a/nomad/leader.go b/nomad/leader.go index 08c658a3245b..df501be435f9 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -1035,6 +1035,8 @@ func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} { return wakeCh } +const cancelableEvalsBatchSize = 728 // structs.MaxUUIDsPerWriteRequest / 10 + // cancelCancelableEvals pulls a batch of cancelable evaluations from the eval // broker and updates their status to canceled. func cancelCancelableEvals(srv *Server) error { @@ -1044,7 +1046,7 @@ func cancelCancelableEvals(srv *Server) error { // We *can* send larger raft logs but rough benchmarks show that a smaller // page size strikes a balance between throughput and time we block the FSM // apply for other operations - cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10) + cancelable := srv.evalBroker.Cancelable(cancelableEvalsBatchSize) if len(cancelable) > 0 { for i, eval := range cancelable { eval = eval.Copy()