diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f9b8d7c63a23..66c90fdba46e 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "runtime" "time" @@ -99,7 +100,16 @@ func (p *planner) planApply() { // Snapshot the state so that we have a consistent view of the world // if no snapshot is available if waitCh == nil || snap == nil { - snap, err = p.fsm.State().Snapshot() + const timeout = 5 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + snap, err = p.fsm.State().SnapshotAfter(ctx, pending.plan.SnapshotIndex) + cancel() + if err == context.DeadlineExceeded { + p.logger.Error("timed out synchronizing to planner's index", + "timeout", timeout, "plan_index", pending.plan.SnapshotIndex) + err = fmt.Errorf("timed out after %s waiting for index=%d", + timeout, pending.plan.SnapshotIndex) + } if err != nil { p.logger.Error("failed to snapshot state", "error", err) pending.respond(nil, err) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index dee93eee733c..a89f541cce6c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8605,6 +8605,11 @@ type Plan struct { // lower priority jobs that are preempted. Preempted allocations are marked // as evicted. NodePreemptions map[string][]*Allocation + + // SnapshotIndex is the Raft index of the snapshot used to create the + // Plan. The leader will wait to evaluate the plan until its StateStore + // has reached at least this index. + SnapshotIndex uint64 } // AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the diff --git a/nomad/worker.go b/nomad/worker.go index 192a078d5dce..b29fa49ed5a3 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -284,6 +284,10 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler. // Add the evaluation token to the plan plan.EvalToken = w.evalToken + // Add SnapshotIndex to ensure leader's StateStore processes the Plan + // at or after the index it was created. + plan.SnapshotIndex = w.snapshotIndex + // Normalize stopped and preempted allocs before RPC normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true) if normalizePlan { @@ -319,7 +323,7 @@ SUBMIT: } // Check if a state update is required. This could be required if we - // planning based on stale data, which is causing issues. For example, a + // planned based on stale data, which is causing issues. For example, a // node failure since the time we've started planning or conflicting task // allocations. var state scheduler.State