From 1d670f2d2466e8d3a278a8896ee9f10455fe4ea3 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 6 Jun 2019 15:44:47 -0700 Subject: [PATCH] nomad: include snapshot index when submitting plans Plan application should use a state snapshot at or after the Raft index at which the plan was created otherwise it risks being rejected based on stale data. This commit adds a Plan.SnapshotIndex which is set by workers when submitting plan. SnapshotIndex is set to the Raft index of the snapshot the worker used to generate the plan. Plan.SnapshotIndex plays a similar role to PlanResult.RefreshIndex. While RefreshIndex informs workers their StateStore is behind the leader's, SnapshotIndex is a way to prevent the leader from using a StateStore behind the worker's. Plan.SnapshotIndex should be considered the *lower bound* index for consistently handling plan application. Plans must also be committed serially, so Plan N+1 should use a state snapshot containing Plan N. This is guaranteed for plans *after* the first plan after a leader election. The Raft barrier on leader election ensures the leader's statestore has caught up to the log index at which it was elected. This guarantees its StateStore is at an index > lastPlanIndex. --- nomad/plan_apply.go | 12 +++++++++++- nomad/structs/structs.go | 5 +++++ nomad/worker.go | 6 +++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 92873d0d52ba..58107a9294ea 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "runtime" "time" @@ -99,7 +100,16 @@ func (p *planner) planApply() { // Snapshot the state so that we have a consistent view of the world // if no snapshot is available if waitCh == nil || snap == nil { - snap, err = p.fsm.State().Snapshot() + const timeout = 5 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + snap, err = p.fsm.State().SnapshotAfter(ctx, pending.plan.SnapshotIndex) + cancel() + if err == context.DeadlineExceeded { + p.logger.Error("timed out synchronizing to planner's index", + "timeout", timeout, "plan_index", pending.plan.SnapshotIndex) + err = fmt.Errorf("timed out after %s waiting for index=%d", + timeout, pending.plan.SnapshotIndex) + } if err != nil { p.logger.Error("failed to snapshot state", "error", err) pending.respond(nil, err) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index dee93eee733c..a89f541cce6c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8605,6 +8605,11 @@ type Plan struct { // lower priority jobs that are preempted. Preempted allocations are marked // as evicted. NodePreemptions map[string][]*Allocation + + // SnapshotIndex is the Raft index of the snapshot used to create the + // Plan. The leader will wait to evaluate the plan until its StateStore + // has reached at least this index. + SnapshotIndex uint64 } // AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the diff --git a/nomad/worker.go b/nomad/worker.go index 192a078d5dce..b29fa49ed5a3 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -284,6 +284,10 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler. // Add the evaluation token to the plan plan.EvalToken = w.evalToken + // Add SnapshotIndex to ensure leader's StateStore processes the Plan + // at or after the index it was created. + plan.SnapshotIndex = w.snapshotIndex + // Normalize stopped and preempted allocs before RPC normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true) if normalizePlan { @@ -319,7 +323,7 @@ SUBMIT: } // Check if a state update is required. This could be required if we - // planning based on stale data, which is causing issues. For example, a + // planned based on stale data, which is causing issues. For example, a // node failure since the time we've started planning or conflicting task // allocations. var state scheduler.State