Skip to content

Commit

Permalink
Merge pull request #5791 from hashicorp/b-plan-snapshotindex
Browse files Browse the repository at this point in the history
nomad: include snapshot index when submitting plans
  • Loading branch information
schmichael committed Jul 17, 2019
2 parents b352af9 + 887b49e commit bcfb39d
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 43 deletions.
106 changes: 86 additions & 20 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"fmt"
"runtime"
"time"
Expand Down Expand Up @@ -68,11 +69,22 @@ func newPlanner(s *Server) (*planner, error) {
// but there are many of those and only a single plan verifier.
//
func (p *planner) planApply() {
// waitCh is used to track an outstanding application while snap
// holds an optimistic state which includes that plan application.
var waitCh chan struct{}
// planIndexCh is used to track an outstanding application and receive
// its committed index while snap holds an optimistic state which
// includes that plan application.
var planIndexCh chan uint64
var snap *state.StateSnapshot

// prevPlanResultIndex is the index when the last PlanResult was
// committed. Since only the last plan is optimistically applied to the
// snapshot, it's possible the current snapshot's and plan's indexes
// are less than the index the previous plan result was committed at.
// prevPlanResultIndex also guards against the previous plan committing
// during Dequeue, thus causing the snapshot containing the optimistic
// commit to be discarded and potentially evaluating the current plan
// against an index older than the previous plan was committed at.
var prevPlanResultIndex uint64

// Setup a worker pool with half the cores, with at least 1
poolSize := runtime.NumCPU() / 2
if poolSize == 0 {
Expand All @@ -88,18 +100,35 @@ func (p *planner) planApply() {
return
}

// Check if out last plan has completed
// If last plan has completed get a new snapshot
select {
case <-waitCh:
waitCh = nil
case idx := <-planIndexCh:
// Previous plan committed. Discard snapshot and ensure
// future snapshots include this plan. idx may be 0 if
// plan failed to apply, so use max(prev, idx)
prevPlanResultIndex = max(prevPlanResultIndex, idx)
planIndexCh = nil
snap = nil
default:
}

if snap != nil {
// If snapshot doesn't contain the previous plan
// result's index and the current plan's snapshot it,
// discard it and get a new one below.
minIndex := max(prevPlanResultIndex, pending.plan.SnapshotIndex)
if idx, err := snap.LatestIndex(); err != nil || idx < minIndex {
snap = nil
}
}

// Snapshot the state so that we have a consistent view of the world
// if no snapshot is available
if waitCh == nil || snap == nil {
snap, err = p.fsm.State().Snapshot()
// if no snapshot is available.
// - planIndexCh will be nil if the previous plan result applied
// during Dequeue
// - snap will be nil if its index < max(prevIndex, curIndex)
if planIndexCh == nil || snap == nil {
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
pending.respond(nil, err)
Expand All @@ -123,11 +152,12 @@ func (p *planner) planApply() {

// Ensure any parallel apply is complete before starting the next one.
// This also limits how out of date our snapshot can be.
if waitCh != nil {
<-waitCh
snap, err = p.fsm.State().Snapshot()
if planIndexCh != nil {
idx := <-planIndexCh
prevPlanResultIndex = max(prevPlanResultIndex, idx)
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
p.logger.Error("failed to update snapshot state", "error", err)
pending.respond(nil, err)
continue
}
Expand All @@ -141,12 +171,35 @@ func (p *planner) planApply() {
continue
}

// Respond to the plan in async
waitCh = make(chan struct{})
go p.asyncPlanWait(waitCh, future, result, pending)
// Respond to the plan in async; receive plan's committed index via chan
planIndexCh = make(chan uint64, 1)
go p.asyncPlanWait(planIndexCh, future, result, pending)
}
}

// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout
// errors to a more descriptive error message. The snapshot is guaranteed to
// include both the previous plan and all objects referenced by the plan or
// return an error.
func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64) (*state.StateSnapshot, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "wait_for_index"}, time.Now())

// Minimum index the snapshot must include is the max of the previous
// plan result's and current plan's snapshot index.
minIndex := max(prevPlanResultIndex, planSnapshotIndex)

const timeout = 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex)
cancel()
if err == context.DeadlineExceeded {
return nil, fmt.Errorf("timed out after %s waiting for index=%d (previous plan result index=%d; plan snapshot index=%d)",
timeout, minIndex, prevPlanResultIndex, planSnapshotIndex)
}

return snap, err
}

// applyPlan is used to apply the plan result and to return the alloc index
func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Setup the update request
Expand Down Expand Up @@ -306,21 +359,26 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
}
}

// asyncPlanWait is used to apply and respond to a plan async
func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
// asyncPlanWait is used to apply and respond to a plan async. On successful
// commit the plan's index will be sent on the chan. On error the chan will be
// closed.
func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(waitCh)

// Wait for the plan to apply
if err := future.Error(); err != nil {
p.logger.Error("failed to apply plan", "error", err)
pending.respond(nil, err)

// Close indexCh on error
close(indexCh)
return
}

// Respond to the plan
result.AllocIndex = future.Index()
index := future.Index()
result.AllocIndex = index

// If this is a partial plan application, we need to ensure the scheduler
// at least has visibility into any placements it made to avoid double placement.
Expand All @@ -330,6 +388,7 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result.RefreshIndex = maxUint64(result.RefreshIndex, result.AllocIndex)
}
pending.respond(result, nil)
indexCh <- index
}

// evaluatePlan is used to determine what portions of a plan
Expand Down Expand Up @@ -619,3 +678,10 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
fit, reason, _, err := structs.AllocsFit(node, proposed, nil, true)
return fit, reason, err
}

func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}
4 changes: 2 additions & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
return snap, nil
}

// SnapshotAfter is used to create a point in time snapshot where the index is
// SnapshotMinIndex is used to create a state snapshot where the index is
// guaranteed to be greater than or equal to the index parameter.
//
// Some server operations (such as scheduling) exchange objects via RPC
Expand All @@ -111,7 +111,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
//
// Callers should maintain their own timer metric as the time this method
// blocks indicates Raft log application latency relative to scheduling.
func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) {
func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*StateSnapshot, error) {
// Ported from work.go:waitForIndex prior to 0.9

const backoffBase = 20 * time.Millisecond
Expand Down
26 changes: 13 additions & 13 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7134,9 +7134,9 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi
require.Nil(denormalizedAllocs)
}

// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks
// TestStateStore_SnapshotMinIndex_OK asserts StateStore.SnapshotMinIndex blocks
// until the StateStore's latest index is >= the requested index.
func TestStateStore_SnapshotAfter_OK(t *testing.T) {
func TestStateStore_SnapshotMinIndex_OK(t *testing.T) {
t.Parallel()

s := testStateStore(t)
Expand All @@ -7146,9 +7146,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
node := mock.Node()
require.NoError(t, s.UpsertNode(index+1, node))

// Assert SnapshotAfter returns immediately if index < latest index
// Assert SnapshotMinIndex returns immediately if index < latest index
ctx, cancel := context.WithTimeout(context.Background(), 0)
snap, err := s.SnapshotAfter(ctx, index)
snap, err := s.SnapshotMinIndex(ctx, index)
cancel()
require.NoError(t, err)

Expand All @@ -7158,24 +7158,24 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
require.Fail(t, "snapshot index should be greater than index")
}

// Assert SnapshotAfter returns immediately if index == latest index
// Assert SnapshotMinIndex returns immediately if index == latest index
ctx, cancel = context.WithTimeout(context.Background(), 0)
snap, err = s.SnapshotAfter(ctx, index+1)
snap, err = s.SnapshotMinIndex(ctx, index+1)
cancel()
require.NoError(t, err)

snapIndex, err = snap.LatestIndex()
require.NoError(t, err)
require.Equal(t, snapIndex, index+1)

// Assert SnapshotAfter blocks if index > latest index
// Assert SnapshotMinIndex blocks if index > latest index
errCh := make(chan error, 1)
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
defer close(errCh)
waitIndex := index + 2
snap, err := s.SnapshotAfter(ctx, waitIndex)
snap, err := s.SnapshotMinIndex(ctx, waitIndex)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -7207,23 +7207,23 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(5 * time.Second):
require.Fail(t, "timed out waiting for SnapshotAfter to unblock")
require.Fail(t, "timed out waiting for SnapshotMinIndex to unblock")
}
}

// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter
// TestStateStore_SnapshotMinIndex_Timeout asserts StateStore.SnapshotMinIndex
// returns an error if the desired index is not reached within the deadline.
func TestStateStore_SnapshotAfter_Timeout(t *testing.T) {
func TestStateStore_SnapshotMinIndex_Timeout(t *testing.T) {
t.Parallel()

s := testStateStore(t)
index, err := s.LatestIndex()
require.NoError(t, err)

// Assert SnapshotAfter blocks if index > latest index
// Assert SnapshotMinIndex blocks if index > latest index
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
snap, err := s.SnapshotAfter(ctx, index+1)
snap, err := s.SnapshotMinIndex(ctx, index+1)
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, snap)
}
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8613,6 +8613,11 @@ type Plan struct {
// lower priority jobs that are preempted. Preempted allocations are marked
// as evicted.
NodePreemptions map[string][]*Allocation

// SnapshotIndex is the Raft index of the snapshot used to create the
// Plan. The leader will wait to evaluate the plan until its StateStore
// has reached at least this index.
SnapshotIndex uint64
}

// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the
Expand Down
16 changes: 10 additions & 6 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (w *Worker) run() {
}

// Wait for the raft log to catchup to the evaluation
snap, err := w.snapshotAfter(waitIndex, raftSyncLimit)
snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit)
if err != nil {
w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex)
w.sendAck(eval.ID, token, false)
Expand Down Expand Up @@ -224,11 +224,11 @@ func (w *Worker) sendAck(evalID, token string, ack bool) {
}
}

// snapshotAfter times calls to StateStore.SnapshotAfter which may block.
func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
// snapshotMinIndex times calls to StateStore.SnapshotAfter which may block.
func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout)
snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex)
snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex)
cancel()
metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)

Expand Down Expand Up @@ -284,6 +284,10 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.
// Add the evaluation token to the plan
plan.EvalToken = w.evalToken

// Add SnapshotIndex to ensure leader's StateStore processes the Plan
// at or after the index it was created.
plan.SnapshotIndex = w.snapshotIndex

// Normalize stopped and preempted allocs before RPC
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true)
if normalizePlan {
Expand Down Expand Up @@ -319,7 +323,7 @@ SUBMIT:
}

// Check if a state update is required. This could be required if we
// planning based on stale data, which is causing issues. For example, a
// planned based on stale data, which is causing issues. For example, a
// node failure since the time we've started planning or conflicting task
// allocations.
var state scheduler.State
Expand All @@ -328,7 +332,7 @@ SUBMIT:
w.logger.Debug("refreshing state", "refresh_index", result.RefreshIndex, "eval_id", plan.EvalID)

var err error
state, err = w.snapshotAfter(result.RefreshIndex, raftSyncLimit)
state, err = w.snapshotMinIndex(result.RefreshIndex, raftSyncLimit)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestWorker_waitForIndex(t *testing.T) {

// Wait for a future index
w := &Worker{srv: s1, logger: s1.logger}
snap, err := w.snapshotAfter(index+1, time.Second)
snap, err := w.snapshotMinIndex(index+1, time.Second)
require.NoError(t, err)
require.NotNil(t, snap)

Expand All @@ -306,7 +306,7 @@ func TestWorker_waitForIndex(t *testing.T) {
// Cause a timeout
waitIndex := index + 100
timeout := 10 * time.Millisecond
snap, err = w.snapshotAfter(index+100, timeout)
snap, err = w.snapshotMinIndex(index+100, timeout)
require.Nil(t, snap)
require.EqualError(t, err,
fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex))
Expand Down

0 comments on commit bcfb39d

Please sign in to comment.