Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block plan application until state store has caught up to raft #5411

Merged
merged 4 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (b *BlockedEvals) Stats() *BlockedStats {
}

// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) {
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):
Expand Down
2 changes: 1 addition & 1 deletion nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (b *EvalBroker) Stats() *BrokerStats {
}

// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh chan struct{}) {
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):
Expand Down
11 changes: 10 additions & 1 deletion nomad/plan_apply.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"fmt"
"runtime"
"time"
Expand Down Expand Up @@ -99,7 +100,15 @@ func (p *planner) planApply() {
// Snapshot the state so that we have a consistent view of the world
// if no snapshot is available
if waitCh == nil || snap == nil {
snap, err = p.fsm.State().Snapshot()
// Wait up to 5s for raft to catch up. Timing out
// causes the eval to be nacked and retried on another
// server, so timing out too quickly could cause
// greater scheduling latency than if we just waited
// longer here.
const indexTimeout = 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), indexTimeout)
snap, err = p.fsm.State().SnapshotAfter(ctx, p.raft.LastIndex())
cancel()
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
pending.respond(nil, err)
Expand Down
2 changes: 1 addition & 1 deletion nomad/plan_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (q *PlanQueue) Stats() *QueueStats {
}

// EmitStats is used to export metrics about the broker while enabled
func (q *PlanQueue) EmitStats(period time.Duration, stopCh chan struct{}) {
func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):
Expand Down
11 changes: 8 additions & 3 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,11 @@ type Server struct {

left bool
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex

shutdownCtx context.Context
shutdownCancel context.CancelFunc
shutdownCh <-chan struct{}
}

// Holds the RPC endpoints
Expand Down Expand Up @@ -303,9 +306,11 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
shutdownCh: make(chan struct{}),
}

s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
s.shutdownCh = s.shutdownCtx.Done()

// Create the RPC handler
s.rpcHandler = newRpcHandler(s)

Expand Down Expand Up @@ -530,7 +535,7 @@ func (s *Server) Shutdown() error {
}

s.shutdown = true
close(s.shutdownCh)
s.shutdownCancel()

if s.serf != nil {
s.serf.Shutdown()
Expand Down
62 changes: 0 additions & 62 deletions nomad/state/notify.go

This file was deleted.

72 changes: 0 additions & 72 deletions nomad/state/notify_test.go

This file was deleted.

55 changes: 55 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,61 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
return snap, nil
}

// SnapshotAfter is used to create a point in time snapshot where the index is
// guaranteed to be greater than or equal to the index parameter.
//
// Some server operations (such as scheduling) exchange objects via RPC
// concurrent with Raft log application, so they must ensure the state store
// snapshot they are operating on is at or after the index the objects
// retrieved via RPC were applied to the Raft log at.
//
// Callers should maintain their own timer metric as the time this method
// blocks indicates Raft log application latency relative to scheduling.
func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) {
// Ported from work.go:waitForIndex prior to 0.9

const backoffBase = 20 * time.Millisecond
const backoffLimit = 1 * time.Second
var retries uint
var retryTimer *time.Timer

// XXX: Potential optimization is to set up a watch on the state
// store's index table and only unblock via a trigger rather than
// polling.
for {
// Get the states current index
snapshotIndex, err := s.LatestIndex()
if err != nil {
return nil, fmt.Errorf("failed to determine state store's index: %v", err)
}

// We only need the FSM state to be as recent as the given index
if snapshotIndex >= index {
return s.Snapshot()
}

// Exponential back off
retries++
if retryTimer == nil {
// First retry, start at baseline
retryTimer = time.NewTimer(backoffBase)
} else {
// Subsequent retry, reset timer
deadline := 1 << (2 * retries) * backoffBase
if deadline > backoffLimit {
deadline = backoffLimit
}
retryTimer.Reset(deadline)
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-retryTimer.C:
}
}
}

// Restore is used to optimize the efficiency of rebuilding
// state by minimizing the number of transactions and checking
// overhead.
Expand Down
94 changes: 94 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7118,6 +7118,100 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi
require.Nil(denormalizedAllocs)
}

// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks
// until the StateStore's latest index is >= the requested index.
func TestStateStore_SnapshotAfter_OK(t *testing.T) {
t.Parallel()

s := testStateStore(t)
index, err := s.LatestIndex()
require.NoError(t, err)

node := mock.Node()
require.NoError(t, s.UpsertNode(index+1, node))

// Assert SnapshotAfter returns immediately if index < latest index
ctx, cancel := context.WithTimeout(context.Background(), 0)
snap, err := s.SnapshotAfter(ctx, index)
cancel()
require.NoError(t, err)

snapIndex, err := snap.LatestIndex()
require.NoError(t, err)
if snapIndex <= index {
require.Fail(t, "snapshot index should be greater than index")
}

// Assert SnapshotAfter returns immediately if index == latest index
ctx, cancel = context.WithTimeout(context.Background(), 0)
snap, err = s.SnapshotAfter(ctx, index+1)
cancel()
require.NoError(t, err)

snapIndex, err = snap.LatestIndex()
require.NoError(t, err)
require.Equal(t, snapIndex, index+1)

// Assert SnapshotAfter blocks if index > latest index
errCh := make(chan error, 1)
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
defer close(errCh)
waitIndex := index + 2
snap, err := s.SnapshotAfter(ctx, waitIndex)
if err != nil {
errCh <- err
return
}

snapIndex, err := snap.LatestIndex()
if err != nil {
errCh <- err
return
}

if snapIndex < waitIndex {
errCh <- fmt.Errorf("snapshot index < wait index: %d < %d", snapIndex, waitIndex)
return
}
}()

select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(500 * time.Millisecond):
// Let it block for a bit before unblocking by upserting
}

node.Name = "hal"
require.NoError(t, s.UpsertNode(index+2, node))

select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(5 * time.Second):
require.Fail(t, "timed out waiting for SnapshotAfter to unblock")
}
}

// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter
// returns an error if the desired index is not reached within the deadline.
func TestStateStore_SnapshotAfter_Timeout(t *testing.T) {
t.Parallel()

s := testStateStore(t)
index, err := s.LatestIndex()
require.NoError(t, err)

// Assert SnapshotAfter blocks if index > latest index
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
snap, err := s.SnapshotAfter(ctx, index+1)
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, snap)
}

// watchFired is a helper for unit tests that returns if the given watch set
// fired (it doesn't care which watch actually fired). This uses a fixed
// timeout since we already expect the event happened before calling this and
Expand Down
Loading