Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nomad: include snapshot index when submitting plans #5791

Merged
merged 4 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 86 additions & 20 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"fmt"
"runtime"
"time"
Expand Down Expand Up @@ -68,11 +69,22 @@ func newPlanner(s *Server) (*planner, error) {
// but there are many of those and only a single plan verifier.
//
func (p *planner) planApply() {
// waitCh is used to track an outstanding application while snap
// holds an optimistic state which includes that plan application.
var waitCh chan struct{}
// planIndexCh is used to track an outstanding application and receive
// its committed index while snap holds an optimistic state which
// includes that plan application.
var planIndexCh chan uint64
var snap *state.StateSnapshot

// prevPlanResultIndex is the index when the last PlanResult was
// committed. Since only the last plan is optimistically applied to the
// snapshot, it's possible the current snapshot's and plan's indexes
// are less than the index the previous plan result was committed at.
// prevPlanResultIndex also guards against the previous plan committing
// during Dequeue, thus causing the snapshot containing the optimistic
// commit to be discarded and potentially evaluating the current plan
// against an index older than the previous plan was committed at.
var prevPlanResultIndex uint64

// Setup a worker pool with half the cores, with at least 1
poolSize := runtime.NumCPU() / 2
if poolSize == 0 {
Expand All @@ -88,18 +100,35 @@ func (p *planner) planApply() {
return
}

// Check if out last plan has completed
// If last plan has completed get a new snapshot
select {
case <-waitCh:
waitCh = nil
case idx := <-planIndexCh:
// Previous plan committed. Discard snapshot and ensure
// future snapshots include this plan. idx may be 0 if
// plan failed to apply, so use max(prev, idx)
prevPlanResultIndex = max(prevPlanResultIndex, idx)
schmichael marked this conversation as resolved.
Show resolved Hide resolved
planIndexCh = nil
snap = nil
default:
}

if snap != nil {
// If snapshot doesn't contain the previous plan
// result's index and the current plan's snapshot it,
// discard it and get a new one below.
minIndex := max(prevPlanResultIndex, pending.plan.SnapshotIndex)
if idx, err := snap.LatestIndex(); err != nil || idx < minIndex {
snap = nil
}
}

// Snapshot the state so that we have a consistent view of the world
// if no snapshot is available
if waitCh == nil || snap == nil {
snap, err = p.fsm.State().Snapshot()
// if no snapshot is available.
// - planIndexCh will be nil if the previous plan result applied
// during Dequeue
// - snap will be nil if its index < max(prevIndex, curIndex)
if planIndexCh == nil || snap == nil {
schmichael marked this conversation as resolved.
Show resolved Hide resolved
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
pending.respond(nil, err)
Expand All @@ -123,11 +152,12 @@ func (p *planner) planApply() {

// Ensure any parallel apply is complete before starting the next one.
// This also limits how out of date our snapshot can be.
if waitCh != nil {
<-waitCh
snap, err = p.fsm.State().Snapshot()
if planIndexCh != nil {
idx := <-planIndexCh
prevPlanResultIndex = max(prevPlanResultIndex, idx)
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
p.logger.Error("failed to update snapshot state", "error", err)
pending.respond(nil, err)
continue
}
Expand All @@ -141,12 +171,35 @@ func (p *planner) planApply() {
continue
}

// Respond to the plan in async
waitCh = make(chan struct{})
go p.asyncPlanWait(waitCh, future, result, pending)
// Respond to the plan in async; receive plan's committed index via chan
planIndexCh = make(chan uint64, 1)
go p.asyncPlanWait(planIndexCh, future, result, pending)
}
}

// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout
// errors to a more descriptive error message. The snapshot is guaranteed to
// include both the previous plan and all objects referenced by the plan or
// return an error.
func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64) (*state.StateSnapshot, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "wait_for_index"}, time.Now())

// Minimum index the snapshot must include is the max of the previous
// plan result's and current plan's snapshot index.
minIndex := max(prevPlanResultIndex, planSnapshotIndex)

const timeout = 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex)
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a defer for the cancel invocation, it's a little more idomatic and is possible now that you've extracted the function.

if err == context.DeadlineExceeded {
return nil, fmt.Errorf("timed out after %s waiting for index=%d (previous plan result index=%d; plan snapshot index=%d)",
timeout, minIndex, prevPlanResultIndex, planSnapshotIndex)
}

return snap, err
}

// applyPlan is used to apply the plan result and to return the alloc index
func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Setup the update request
Expand Down Expand Up @@ -306,21 +359,26 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
}
}

// asyncPlanWait is used to apply and respond to a plan async
func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
// asyncPlanWait is used to apply and respond to a plan async. On successful
// commit the plan's index will be sent on the chan. On error the chan will be
// closed.
func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(waitCh)

// Wait for the plan to apply
if err := future.Error(); err != nil {
p.logger.Error("failed to apply plan", "error", err)
pending.respond(nil, err)

// Close indexCh on error
close(indexCh)
return
}

// Respond to the plan
result.AllocIndex = future.Index()
index := future.Index()
result.AllocIndex = index

// If this is a partial plan application, we need to ensure the scheduler
// at least has visibility into any placements it made to avoid double placement.
Expand All @@ -330,6 +388,7 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result.RefreshIndex = maxUint64(result.RefreshIndex, result.AllocIndex)
}
pending.respond(result, nil)
indexCh <- index
}

// evaluatePlan is used to determine what portions of a plan
Expand Down Expand Up @@ -619,3 +678,10 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
fit, reason, _, err := structs.AllocsFit(node, proposed, nil, true)
return fit, reason, err
}

func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}
4 changes: 2 additions & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
return snap, nil
}

// SnapshotAfter is used to create a point in time snapshot where the index is
// SnapshotMinIndex is used to create a state snapshot where the index is
// guaranteed to be greater than or equal to the index parameter.
//
// Some server operations (such as scheduling) exchange objects via RPC
Expand All @@ -111,7 +111,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
//
// Callers should maintain their own timer metric as the time this method
// blocks indicates Raft log application latency relative to scheduling.
func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) {
func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*StateSnapshot, error) {
// Ported from work.go:waitForIndex prior to 0.9

const backoffBase = 20 * time.Millisecond
Expand Down
26 changes: 13 additions & 13 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7138,9 +7138,9 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi
require.Nil(denormalizedAllocs)
}

// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks
// TestStateStore_SnapshotMinIndex_OK asserts StateStore.SnapshotMinIndex blocks
// until the StateStore's latest index is >= the requested index.
func TestStateStore_SnapshotAfter_OK(t *testing.T) {
func TestStateStore_SnapshotMinIndex_OK(t *testing.T) {
t.Parallel()

s := testStateStore(t)
Expand All @@ -7150,9 +7150,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
node := mock.Node()
require.NoError(t, s.UpsertNode(index+1, node))

// Assert SnapshotAfter returns immediately if index < latest index
// Assert SnapshotMinIndex returns immediately if index < latest index
ctx, cancel := context.WithTimeout(context.Background(), 0)
snap, err := s.SnapshotAfter(ctx, index)
snap, err := s.SnapshotMinIndex(ctx, index)
cancel()
require.NoError(t, err)

Expand All @@ -7162,24 +7162,24 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
require.Fail(t, "snapshot index should be greater than index")
}

// Assert SnapshotAfter returns immediately if index == latest index
// Assert SnapshotMinIndex returns immediately if index == latest index
ctx, cancel = context.WithTimeout(context.Background(), 0)
snap, err = s.SnapshotAfter(ctx, index+1)
snap, err = s.SnapshotMinIndex(ctx, index+1)
cancel()
require.NoError(t, err)

snapIndex, err = snap.LatestIndex()
require.NoError(t, err)
require.Equal(t, snapIndex, index+1)

// Assert SnapshotAfter blocks if index > latest index
// Assert SnapshotMinIndex blocks if index > latest index
errCh := make(chan error, 1)
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
defer close(errCh)
waitIndex := index + 2
snap, err := s.SnapshotAfter(ctx, waitIndex)
snap, err := s.SnapshotMinIndex(ctx, waitIndex)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -7211,23 +7211,23 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(5 * time.Second):
require.Fail(t, "timed out waiting for SnapshotAfter to unblock")
require.Fail(t, "timed out waiting for SnapshotMinIndex to unblock")
}
}

// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter
// TestStateStore_SnapshotMinIndex_Timeout asserts StateStore.SnapshotMinIndex
// returns an error if the desired index is not reached within the deadline.
func TestStateStore_SnapshotAfter_Timeout(t *testing.T) {
func TestStateStore_SnapshotMinIndex_Timeout(t *testing.T) {
t.Parallel()

s := testStateStore(t)
index, err := s.LatestIndex()
require.NoError(t, err)

// Assert SnapshotAfter blocks if index > latest index
// Assert SnapshotMinIndex blocks if index > latest index
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
snap, err := s.SnapshotAfter(ctx, index+1)
snap, err := s.SnapshotMinIndex(ctx, index+1)
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, snap)
}
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8605,6 +8605,11 @@ type Plan struct {
// lower priority jobs that are preempted. Preempted allocations are marked
// as evicted.
NodePreemptions map[string][]*Allocation

// SnapshotIndex is the Raft index of the snapshot used to create the
// Plan. The leader will wait to evaluate the plan until its StateStore
// has reached at least this index.
SnapshotIndex uint64
}

// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the
Expand Down
16 changes: 10 additions & 6 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (w *Worker) run() {
}

// Wait for the raft log to catchup to the evaluation
snap, err := w.snapshotAfter(waitIndex, raftSyncLimit)
snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit)
if err != nil {
w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex)
w.sendAck(eval.ID, token, false)
Expand Down Expand Up @@ -224,11 +224,11 @@ func (w *Worker) sendAck(evalID, token string, ack bool) {
}
}

// snapshotAfter times calls to StateStore.SnapshotAfter which may block.
func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
// snapshotMinIndex times calls to StateStore.SnapshotAfter which may block.
func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout)
snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex)
snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex)
cancel()
metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)

Expand Down Expand Up @@ -284,6 +284,10 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.
// Add the evaluation token to the plan
plan.EvalToken = w.evalToken

// Add SnapshotIndex to ensure leader's StateStore processes the Plan
// at or after the index it was created.
plan.SnapshotIndex = w.snapshotIndex

// Normalize stopped and preempted allocs before RPC
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true)
if normalizePlan {
Expand Down Expand Up @@ -319,7 +323,7 @@ SUBMIT:
}

// Check if a state update is required. This could be required if we
// planning based on stale data, which is causing issues. For example, a
// planned based on stale data, which is causing issues. For example, a
// node failure since the time we've started planning or conflicting task
// allocations.
var state scheduler.State
Expand All @@ -328,7 +332,7 @@ SUBMIT:
w.logger.Debug("refreshing state", "refresh_index", result.RefreshIndex, "eval_id", plan.EvalID)

var err error
state, err = w.snapshotAfter(result.RefreshIndex, raftSyncLimit)
state, err = w.snapshotMinIndex(result.RefreshIndex, raftSyncLimit)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestWorker_waitForIndex(t *testing.T) {

// Wait for a future index
w := &Worker{srv: s1, logger: s1.logger}
snap, err := w.snapshotAfter(index+1, time.Second)
snap, err := w.snapshotMinIndex(index+1, time.Second)
require.NoError(t, err)
require.NotNil(t, snap)

Expand All @@ -306,7 +306,7 @@ func TestWorker_waitForIndex(t *testing.T) {
// Cause a timeout
waitIndex := index + 100
timeout := 10 * time.Millisecond
snap, err = w.snapshotAfter(index+100, timeout)
snap, err = w.snapshotMinIndex(index+100, timeout)
require.Nil(t, snap)
require.EqualError(t, err,
fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex))
Expand Down