Skip to content

Commit

Permalink
fsm: check schedConfig when operator is deleting eval.
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasell committed Jun 29, 2022
1 parent aa112fd commit aa618d8
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 13 deletions.
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func TestClient_WatchAllocs(t *testing.T) {
})

// Delete one allocation
if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil {
if err := state.DeleteEval(103, nil, []string{alloc1.ID}, false); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2388,7 +2388,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
require.NoError(t, err)
index, _ = store.LatestIndex()
index++
err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID})
err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}, false)
require.NoError(t, err)

// Create a core scheduler and attempt the volume claim GC
Expand Down
5 changes: 3 additions & 2 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,9 @@ func (e *Eval) Delete(
// avoids adding new Raft messages types and follows the existing reap
// flow.
raftReq := structs.EvalReapRequest{
Evals: args.EvalIDs,
WriteRequest: args.WriteRequest,
Evals: args.EvalIDs,
UserInitiated: true,
WriteRequest: args.WriteRequest,
}

// Update via Raft.
Expand Down
4 changes: 2 additions & 2 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestEvalEndpoint_GetEval_Blocking(t *testing.T) {

// Eval delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
err := state.DeleteEval(300, []string{eval2.ID}, []string{})
err := state.DeleteEval(300, []string{eval2.ID}, []string{}, false)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1333,7 +1333,7 @@ func TestEvalEndpoint_List_Blocking(t *testing.T) {

// Eval deletion triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteEval(3, []string{eval.ID}, nil); err != nil {
if err := state.DeleteEval(3, []string{eval.ID}, nil, false); err != nil {
t.Fatalf("err: %v", err)
}
})
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteEval(index, req.Evals, req.Allocs); err != nil {
if err := n.state.DeleteEval(index, req.Evals, req.Allocs, req.UserInitiated); err != nil {
n.logger.Error("DeleteEval failed", "error", err)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2265,7 +2265,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) {

// Delete an allocation
time.AfterFunc(100*time.Millisecond, func() {
assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID}))
assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID}, false))
})

req.QueryOptions.MinQueryIndex = 150
Expand Down
21 changes: 19 additions & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -3107,10 +3108,22 @@ func (s *StateStore) updateEvalModifyIndex(txn *txn, index uint64, evalID string
}

// DeleteEval is used to delete an evaluation
func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error {
func (s *StateStore) DeleteEval(index uint64, evals, allocs []string, userInitiated bool) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()

// If this deletion has been initiated by an operator, ensure the eval
// broker is paused.
if userInitiated {
_, schedConfig, err := s.schedulerConfigTxn(txn)
if err != nil {
return err
}
if schedConfig == nil || !schedConfig.PauseEvalBroker {
return errors.New("eval broker is enabled; eval broker must be paused to delete evals")
}
}

jobs := make(map[structs.NamespacedID]string, len(evals))
for _, eval := range evals {
existing, err := txn.First("evals", "id", eval)
Expand Down Expand Up @@ -5871,9 +5884,13 @@ func expiredOneTimeTokenFilter(now time.Time) func(interface{}) bool {
func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return s.schedulerConfigTxn(tx)
}

func (s *StateStore) schedulerConfigTxn(txn *txn) (uint64, *structs.SchedulerConfiguration, error) {

// Get the scheduler config
c, err := tx.First("scheduler_config", "id")
c, err := txn.First("scheduler_config", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err)
}
Expand Down
43 changes: 41 additions & 2 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4233,7 +4233,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
t.Fatalf("err: %v", err)
}

err = state.DeleteEval(1002, []string{eval1.ID, eval2.ID}, []string{alloc1.ID, alloc2.ID})
err = state.DeleteEval(1002, []string{eval1.ID, eval2.ID}, []string{alloc1.ID, alloc2.ID}, false)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -4341,7 +4341,7 @@ func TestStateStore_DeleteEval_ChildJob(t *testing.T) {
t.Fatalf("bad: %v", err)
}

err = state.DeleteEval(1002, []string{eval1.ID}, []string{alloc1.ID})
err = state.DeleteEval(1002, []string{eval1.ID}, []string{alloc1.ID}, false)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -4373,6 +4373,45 @@ func TestStateStore_DeleteEval_ChildJob(t *testing.T) {
}
}

func TestStateStore_DeleteEval_UserInitiated(t *testing.T) {
ci.Parallel(t)

testState := testStateStore(t)

// Upsert a scheduler config object, so we have something to check and
// modify.
schedulerConfig := structs.SchedulerConfiguration{PauseEvalBroker: false}
require.NoError(t, testState.SchedulerSetConfig(10, &schedulerConfig))

// Generate some mock evals and upsert these into state.
mockEval1 := mock.Eval()
mockEval2 := mock.Eval()
require.NoError(t, testState.UpsertEvals(
structs.MsgTypeTestSetup, 20, []*structs.Evaluation{mockEval1, mockEval2}))

mockEvalIDs := []string{mockEval1.ID, mockEval2.ID}

// Try and delete the evals without pausing the eval broker.
err := testState.DeleteEval(30, mockEvalIDs, []string{}, true)
require.ErrorContains(t, err, "eval broker is enabled")

// Pause the eval broker on the scheduler config, and try deleting the
// evals again.
schedulerConfig.PauseEvalBroker = true
require.NoError(t, testState.SchedulerSetConfig(30, &schedulerConfig))

require.NoError(t, testState.DeleteEval(40, mockEvalIDs, []string{}, true))

ws := memdb.NewWatchSet()
mockEval1Lookup, err := testState.EvalByID(ws, mockEval1.ID)
require.NoError(t, err)
require.Nil(t, mockEval1Lookup)

mockEval2Lookup, err := testState.EvalByID(ws, mockEval1.ID)
require.NoError(t, err)
require.Nil(t, mockEval2Lookup)
}

func TestStateStore_EvalsByJob(t *testing.T) {
ci.Parallel(t)

Expand Down
6 changes: 6 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,12 @@ type EvalUpdateRequest struct {
type EvalReapRequest struct {
Evals []string
Allocs []string

// UserInitiated tracks whether this reap request is the result of an
// operator request. If this is true, the FSM needs to ensure the eval
// broker is paused as the request can include non-terminal allocations.
UserInitiated bool

WriteRequest
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {

// allocation is now invalid
index++
err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID})
err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}, false)
require.NoError(t, err)

// emit a GC so that we have a volume change that's dropped
Expand Down

0 comments on commit aa618d8

Please sign in to comment.