From 1df06bddda476e3b5e3208a53a2c15d1eb25c15e Mon Sep 17 00:00:00 2001 From: James Rasell Date: Wed, 29 Jun 2022 09:45:11 +0100 Subject: [PATCH] fsm: check schedConfig when operator is deleting eval. --- client/client_test.go | 2 +- nomad/core_sched_test.go | 2 +- nomad/eval_endpoint.go | 5 ++- nomad/eval_endpoint_test.go | 13 +++++-- nomad/fsm.go | 2 +- nomad/node_endpoint_test.go | 2 +- nomad/state/state_store.go | 21 +++++++++- nomad/state/state_store_test.go | 43 ++++++++++++++++++++- nomad/structs/structs.go | 6 +++ nomad/volumewatcher/volumes_watcher_test.go | 2 +- 10 files changed, 84 insertions(+), 14 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 8750b9531e91..d5dafab3a2cc 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -508,7 +508,7 @@ func TestClient_WatchAllocs(t *testing.T) { }) // Delete one allocation - if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil { + if err := state.DeleteEval(103, nil, []string{alloc1.ID}, false); err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 4b6f393b751a..76aa6eb29b92 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2388,7 +2388,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { require.NoError(t, err) index, _ = store.LatestIndex() index++ - err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}) + err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}, false) require.NoError(t, err) // Create a core scheduler and attempt the volume claim GC diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 01aef7cd36fd..111ccf81ba6d 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -483,8 +483,9 @@ func (e *Eval) Delete( // avoids adding new Raft messages types and follows the existing reap // flow. raftReq := structs.EvalReapRequest{ - Evals: args.EvalIDs, - WriteRequest: args.WriteRequest, + Evals: args.EvalIDs, + UserInitiated: true, + WriteRequest: args.WriteRequest, } // Update via Raft. diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 1dfbbe5f4a15..4d49d3516072 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -202,7 +202,7 @@ func TestEvalEndpoint_GetEval_Blocking(t *testing.T) { // Eval delete triggers watches time.AfterFunc(100*time.Millisecond, func() { - err := state.DeleteEval(300, []string{eval2.ID}, []string{}) + err := state.DeleteEval(300, []string{eval2.ID}, []string{}, false) if err != nil { t.Fatalf("err: %v", err) } @@ -758,9 +758,16 @@ func TestEvalEndpoint_Delete(t *testing.T) { codec := rpcClient(t, testServer) testutil.WaitForLeader(t, testServer.RPC) - // Pause the eval broker. + // Pause the eval broker and update the scheduler config. testServer.evalBroker.SetEnabled(false) + _, schedulerConfig, err := testServer.fsm.State().SchedulerConfig() + require.NoError(t, err) + require.NotNil(t, schedulerConfig) + + schedulerConfig.PauseEvalBroker = true + require.NoError(t, testServer.fsm.State().SchedulerSetConfig(10, schedulerConfig)) + // Create and upsert an evaluation. mockEval := mock.Eval() require.NoError(t, testServer.fsm.State().UpsertEvals( @@ -1333,7 +1340,7 @@ func TestEvalEndpoint_List_Blocking(t *testing.T) { // Eval deletion triggers watches time.AfterFunc(100*time.Millisecond, func() { - if err := state.DeleteEval(3, []string{eval.ID}, nil); err != nil { + if err := state.DeleteEval(3, []string{eval.ID}, nil, false); err != nil { t.Fatalf("err: %v", err) } }) diff --git a/nomad/fsm.go b/nomad/fsm.go index 11395e428d09..e686e211b0ac 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -787,7 +787,7 @@ func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.DeleteEval(index, req.Evals, req.Allocs); err != nil { + if err := n.state.DeleteEval(index, req.Evals, req.Allocs, req.UserInitiated); err != nil { n.logger.Error("DeleteEval failed", "error", err) return err } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 3232eb923334..b0e35ad283dd 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2265,7 +2265,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { // Delete an allocation time.AfterFunc(100*time.Millisecond, func() { - assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID})) + assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID}, false)) }) req.QueryOptions.MinQueryIndex = 150 diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index c749097daa3e..75f3526f9be5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2,6 +2,7 @@ package state import ( "context" + "errors" "fmt" "reflect" "sort" @@ -3107,10 +3108,22 @@ func (s *StateStore) updateEvalModifyIndex(txn *txn, index uint64, evalID string } // DeleteEval is used to delete an evaluation -func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { +func (s *StateStore) DeleteEval(index uint64, evals, allocs []string, userInitiated bool) error { txn := s.db.WriteTxn(index) defer txn.Abort() + // If this deletion has been initiated by an operator, ensure the eval + // broker is paused. + if userInitiated { + _, schedConfig, err := s.schedulerConfigTxn(txn) + if err != nil { + return err + } + if schedConfig == nil || !schedConfig.PauseEvalBroker { + return errors.New("eval broker is enabled; eval broker must be paused to delete evals") + } + } + jobs := make(map[structs.NamespacedID]string, len(evals)) for _, eval := range evals { existing, err := txn.First("evals", "id", eval) @@ -5871,9 +5884,13 @@ func expiredOneTimeTokenFilter(now time.Time) func(interface{}) bool { func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { tx := s.db.ReadTxn() defer tx.Abort() + return s.schedulerConfigTxn(tx) +} + +func (s *StateStore) schedulerConfigTxn(txn *txn) (uint64, *structs.SchedulerConfiguration, error) { // Get the scheduler config - c, err := tx.First("scheduler_config", "id") + c, err := txn.First("scheduler_config", "id") if err != nil { return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 33e0748cd390..971e7ff3e90d 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -4233,7 +4233,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) { t.Fatalf("err: %v", err) } - err = state.DeleteEval(1002, []string{eval1.ID, eval2.ID}, []string{alloc1.ID, alloc2.ID}) + err = state.DeleteEval(1002, []string{eval1.ID, eval2.ID}, []string{alloc1.ID, alloc2.ID}, false) if err != nil { t.Fatalf("err: %v", err) } @@ -4341,7 +4341,7 @@ func TestStateStore_DeleteEval_ChildJob(t *testing.T) { t.Fatalf("bad: %v", err) } - err = state.DeleteEval(1002, []string{eval1.ID}, []string{alloc1.ID}) + err = state.DeleteEval(1002, []string{eval1.ID}, []string{alloc1.ID}, false) if err != nil { t.Fatalf("err: %v", err) } @@ -4373,6 +4373,45 @@ func TestStateStore_DeleteEval_ChildJob(t *testing.T) { } } +func TestStateStore_DeleteEval_UserInitiated(t *testing.T) { + ci.Parallel(t) + + testState := testStateStore(t) + + // Upsert a scheduler config object, so we have something to check and + // modify. + schedulerConfig := structs.SchedulerConfiguration{PauseEvalBroker: false} + require.NoError(t, testState.SchedulerSetConfig(10, &schedulerConfig)) + + // Generate some mock evals and upsert these into state. + mockEval1 := mock.Eval() + mockEval2 := mock.Eval() + require.NoError(t, testState.UpsertEvals( + structs.MsgTypeTestSetup, 20, []*structs.Evaluation{mockEval1, mockEval2})) + + mockEvalIDs := []string{mockEval1.ID, mockEval2.ID} + + // Try and delete the evals without pausing the eval broker. + err := testState.DeleteEval(30, mockEvalIDs, []string{}, true) + require.ErrorContains(t, err, "eval broker is enabled") + + // Pause the eval broker on the scheduler config, and try deleting the + // evals again. + schedulerConfig.PauseEvalBroker = true + require.NoError(t, testState.SchedulerSetConfig(30, &schedulerConfig)) + + require.NoError(t, testState.DeleteEval(40, mockEvalIDs, []string{}, true)) + + ws := memdb.NewWatchSet() + mockEval1Lookup, err := testState.EvalByID(ws, mockEval1.ID) + require.NoError(t, err) + require.Nil(t, mockEval1Lookup) + + mockEval2Lookup, err := testState.EvalByID(ws, mockEval1.ID) + require.NoError(t, err) + require.Nil(t, mockEval2Lookup) +} + func TestStateStore_EvalsByJob(t *testing.T) { ci.Parallel(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ca5941960043..65a8a523c11c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -830,6 +830,12 @@ type EvalUpdateRequest struct { type EvalReapRequest struct { Evals []string Allocs []string + + // UserInitiated tracks whether this reap request is the result of an + // operator request. If this is true, the FSM needs to ensure the eval + // broker is paused as the request can include non-terminal allocations. + UserInitiated bool + WriteRequest } diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 97a83a9b538a..dc4c60ed8331 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -108,7 +108,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // allocation is now invalid index++ - err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}) + err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}, false) require.NoError(t, err) // emit a GC so that we have a volume change that's dropped