diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 92873d0d52ba..27368f4475ab 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "runtime" "time" @@ -68,11 +69,22 @@ func newPlanner(s *Server) (*planner, error) { // but there are many of those and only a single plan verifier. // func (p *planner) planApply() { - // waitCh is used to track an outstanding application while snap - // holds an optimistic state which includes that plan application. - var waitCh chan struct{} + // planIndexCh is used to track an outstanding application and receive + // its committed index while snap holds an optimistic state which + // includes that plan application. + var planIndexCh chan uint64 var snap *state.StateSnapshot + // prevPlanResultIndex is the index when the last PlanResult was + // committed. Since only the last plan is optimistically applied to the + // snapshot, it's possible the current snapshot's and plan's indexes + // are less than the index the previous plan result was committed at. + // prevPlanResultIndex also guards against the previous plan committing + // during Dequeue, thus causing the snapshot containing the optimistic + // commit to be discarded and potentially evaluating the current plan + // against an index older than the previous plan was committed at. + var prevPlanResultIndex uint64 + // Setup a worker pool with half the cores, with at least 1 poolSize := runtime.NumCPU() / 2 if poolSize == 0 { @@ -88,18 +100,35 @@ func (p *planner) planApply() { return } - // Check if out last plan has completed + // If last plan has completed get a new snapshot select { - case <-waitCh: - waitCh = nil + case idx := <-planIndexCh: + // Previous plan committed. Discard snapshot and ensure + // future snapshots include this plan. idx may be 0 if + // plan failed to apply, so use max(prev, idx) + prevPlanResultIndex = max(prevPlanResultIndex, idx) + planIndexCh = nil snap = nil default: } + if snap != nil { + // If snapshot doesn't contain the previous plan + // result's index and the current plan's snapshot it, + // discard it and get a new one below. + minIndex := max(prevPlanResultIndex, pending.plan.SnapshotIndex) + if idx, err := snap.LatestIndex(); err != nil || idx < minIndex { + snap = nil + } + } + // Snapshot the state so that we have a consistent view of the world - // if no snapshot is available - if waitCh == nil || snap == nil { - snap, err = p.fsm.State().Snapshot() + // if no snapshot is available. + // - planIndexCh will be nil if the previous plan result applied + // during Dequeue + // - snap will be nil if its index < max(prevIndex, curIndex) + if planIndexCh == nil || snap == nil { + snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { p.logger.Error("failed to snapshot state", "error", err) pending.respond(nil, err) @@ -123,11 +152,12 @@ func (p *planner) planApply() { // Ensure any parallel apply is complete before starting the next one. // This also limits how out of date our snapshot can be. - if waitCh != nil { - <-waitCh - snap, err = p.fsm.State().Snapshot() + if planIndexCh != nil { + idx := <-planIndexCh + prevPlanResultIndex = max(prevPlanResultIndex, idx) + snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { - p.logger.Error("failed to snapshot state", "error", err) + p.logger.Error("failed to update snapshot state", "error", err) pending.respond(nil, err) continue } @@ -141,12 +171,35 @@ func (p *planner) planApply() { continue } - // Respond to the plan in async - waitCh = make(chan struct{}) - go p.asyncPlanWait(waitCh, future, result, pending) + // Respond to the plan in async; receive plan's committed index via chan + planIndexCh = make(chan uint64, 1) + go p.asyncPlanWait(planIndexCh, future, result, pending) } } +// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout +// errors to a more descriptive error message. The snapshot is guaranteed to +// include both the previous plan and all objects referenced by the plan or +// return an error. +func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64) (*state.StateSnapshot, error) { + defer metrics.MeasureSince([]string{"nomad", "plan", "wait_for_index"}, time.Now()) + + // Minimum index the snapshot must include is the max of the previous + // plan result's and current plan's snapshot index. + minIndex := max(prevPlanResultIndex, planSnapshotIndex) + + const timeout = 5 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex) + cancel() + if err == context.DeadlineExceeded { + return nil, fmt.Errorf("timed out after %s waiting for index=%d (previous plan result index=%d; plan snapshot index=%d)", + timeout, minIndex, prevPlanResultIndex, planSnapshotIndex) + } + + return snap, err +} + // applyPlan is used to apply the plan result and to return the alloc index func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { // Setup the update request @@ -306,21 +359,26 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) { } } -// asyncPlanWait is used to apply and respond to a plan async -func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, +// asyncPlanWait is used to apply and respond to a plan async. On successful +// commit the plan's index will be sent on the chan. On error the chan will be +// closed. +func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture, result *structs.PlanResult, pending *pendingPlan) { defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) - defer close(waitCh) // Wait for the plan to apply if err := future.Error(); err != nil { p.logger.Error("failed to apply plan", "error", err) pending.respond(nil, err) + + // Close indexCh on error + close(indexCh) return } // Respond to the plan - result.AllocIndex = future.Index() + index := future.Index() + result.AllocIndex = index // If this is a partial plan application, we need to ensure the scheduler // at least has visibility into any placements it made to avoid double placement. @@ -330,6 +388,7 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, result.RefreshIndex = maxUint64(result.RefreshIndex, result.AllocIndex) } pending.respond(result, nil) + indexCh <- index } // evaluatePlan is used to determine what portions of a plan @@ -619,3 +678,10 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri fit, reason, _, err := structs.AllocsFit(node, proposed, nil, true) return fit, reason, err } + +func max(a, b uint64) uint64 { + if a > b { + return a + } + return b +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 09c74286400a..af522d4483bd 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -101,7 +101,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return snap, nil } -// SnapshotAfter is used to create a point in time snapshot where the index is +// SnapshotMinIndex is used to create a state snapshot where the index is // guaranteed to be greater than or equal to the index parameter. // // Some server operations (such as scheduling) exchange objects via RPC @@ -111,7 +111,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { // // Callers should maintain their own timer metric as the time this method // blocks indicates Raft log application latency relative to scheduling. -func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) { +func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*StateSnapshot, error) { // Ported from work.go:waitForIndex prior to 0.9 const backoffBase = 20 * time.Millisecond diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index d21e4bbdefbc..4aa6e949241d 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7138,9 +7138,9 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi require.Nil(denormalizedAllocs) } -// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks +// TestStateStore_SnapshotMinIndex_OK asserts StateStore.SnapshotMinIndex blocks // until the StateStore's latest index is >= the requested index. -func TestStateStore_SnapshotAfter_OK(t *testing.T) { +func TestStateStore_SnapshotMinIndex_OK(t *testing.T) { t.Parallel() s := testStateStore(t) @@ -7150,9 +7150,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { node := mock.Node() require.NoError(t, s.UpsertNode(index+1, node)) - // Assert SnapshotAfter returns immediately if index < latest index + // Assert SnapshotMinIndex returns immediately if index < latest index ctx, cancel := context.WithTimeout(context.Background(), 0) - snap, err := s.SnapshotAfter(ctx, index) + snap, err := s.SnapshotMinIndex(ctx, index) cancel() require.NoError(t, err) @@ -7162,9 +7162,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { require.Fail(t, "snapshot index should be greater than index") } - // Assert SnapshotAfter returns immediately if index == latest index + // Assert SnapshotMinIndex returns immediately if index == latest index ctx, cancel = context.WithTimeout(context.Background(), 0) - snap, err = s.SnapshotAfter(ctx, index+1) + snap, err = s.SnapshotMinIndex(ctx, index+1) cancel() require.NoError(t, err) @@ -7172,14 +7172,14 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { require.NoError(t, err) require.Equal(t, snapIndex, index+1) - // Assert SnapshotAfter blocks if index > latest index + // Assert SnapshotMinIndex blocks if index > latest index errCh := make(chan error, 1) ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() go func() { defer close(errCh) waitIndex := index + 2 - snap, err := s.SnapshotAfter(ctx, waitIndex) + snap, err := s.SnapshotMinIndex(ctx, waitIndex) if err != nil { errCh <- err return @@ -7211,23 +7211,23 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { case err := <-errCh: require.NoError(t, err) case <-time.After(5 * time.Second): - require.Fail(t, "timed out waiting for SnapshotAfter to unblock") + require.Fail(t, "timed out waiting for SnapshotMinIndex to unblock") } } -// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter +// TestStateStore_SnapshotMinIndex_Timeout asserts StateStore.SnapshotMinIndex // returns an error if the desired index is not reached within the deadline. -func TestStateStore_SnapshotAfter_Timeout(t *testing.T) { +func TestStateStore_SnapshotMinIndex_Timeout(t *testing.T) { t.Parallel() s := testStateStore(t) index, err := s.LatestIndex() require.NoError(t, err) - // Assert SnapshotAfter blocks if index > latest index + // Assert SnapshotMinIndex blocks if index > latest index ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - snap, err := s.SnapshotAfter(ctx, index+1) + snap, err := s.SnapshotMinIndex(ctx, index+1) require.EqualError(t, err, context.DeadlineExceeded.Error()) require.Nil(t, snap) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index dee93eee733c..a89f541cce6c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8605,6 +8605,11 @@ type Plan struct { // lower priority jobs that are preempted. Preempted allocations are marked // as evicted. NodePreemptions map[string][]*Allocation + + // SnapshotIndex is the Raft index of the snapshot used to create the + // Plan. The leader will wait to evaluate the plan until its StateStore + // has reached at least this index. + SnapshotIndex uint64 } // AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the diff --git a/nomad/worker.go b/nomad/worker.go index 192a078d5dce..06a8dc34db16 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -118,7 +118,7 @@ func (w *Worker) run() { } // Wait for the raft log to catchup to the evaluation - snap, err := w.snapshotAfter(waitIndex, raftSyncLimit) + snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit) if err != nil { w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) w.sendAck(eval.ID, token, false) @@ -224,11 +224,11 @@ func (w *Worker) sendAck(evalID, token string, ack bool) { } } -// snapshotAfter times calls to StateStore.SnapshotAfter which may block. -func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { +// snapshotMinIndex times calls to StateStore.SnapshotAfter which may block. +func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { start := time.Now() ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout) - snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex) + snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex) cancel() metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) @@ -284,6 +284,10 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler. // Add the evaluation token to the plan plan.EvalToken = w.evalToken + // Add SnapshotIndex to ensure leader's StateStore processes the Plan + // at or after the index it was created. + plan.SnapshotIndex = w.snapshotIndex + // Normalize stopped and preempted allocs before RPC normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true) if normalizePlan { @@ -319,7 +323,7 @@ SUBMIT: } // Check if a state update is required. This could be required if we - // planning based on stale data, which is causing issues. For example, a + // planned based on stale data, which is causing issues. For example, a // node failure since the time we've started planning or conflicting task // allocations. var state scheduler.State @@ -328,7 +332,7 @@ SUBMIT: w.logger.Debug("refreshing state", "refresh_index", result.RefreshIndex, "eval_id", plan.EvalID) var err error - state, err = w.snapshotAfter(result.RefreshIndex, raftSyncLimit) + state, err = w.snapshotMinIndex(result.RefreshIndex, raftSyncLimit) if err != nil { return nil, nil, err } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 70b0a1c99150..4be5041daefa 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -296,7 +296,7 @@ func TestWorker_waitForIndex(t *testing.T) { // Wait for a future index w := &Worker{srv: s1, logger: s1.logger} - snap, err := w.snapshotAfter(index+1, time.Second) + snap, err := w.snapshotMinIndex(index+1, time.Second) require.NoError(t, err) require.NotNil(t, snap) @@ -306,7 +306,7 @@ func TestWorker_waitForIndex(t *testing.T) { // Cause a timeout waitIndex := index + 100 timeout := 10 * time.Millisecond - snap, err = w.snapshotAfter(index+100, timeout) + snap, err = w.snapshotMinIndex(index+100, timeout) require.Nil(t, snap) require.EqualError(t, err, fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex))