Skip to content

Commit

Permalink
Merge pull request #5411 from hashicorp/b-snapshotafter
Browse files Browse the repository at this point in the history
Block plan application until state store has caught up to raft
  • Loading branch information
schmichael authored May 20, 2019
2 parents 0ecbfe6 + a2e4f12 commit 59946ff
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 215 deletions.
2 changes: 1 addition & 1 deletion nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (b *BlockedEvals) Stats() *BlockedStats {
}

// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) {
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):
Expand Down
2 changes: 1 addition & 1 deletion nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (b *EvalBroker) Stats() *BrokerStats {
}

// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh chan struct{}) {
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):
Expand Down
11 changes: 10 additions & 1 deletion nomad/plan_apply.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"fmt"
"runtime"
"time"
Expand Down Expand Up @@ -99,7 +100,15 @@ func (p *planner) planApply() {
// Snapshot the state so that we have a consistent view of the world
// if no snapshot is available
if waitCh == nil || snap == nil {
snap, err = p.fsm.State().Snapshot()
// Wait up to 5s for raft to catch up. Timing out
// causes the eval to be nacked and retried on another
// server, so timing out too quickly could cause
// greater scheduling latency than if we just waited
// longer here.
const indexTimeout = 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), indexTimeout)
snap, err = p.fsm.State().SnapshotAfter(ctx, p.raft.LastIndex())
cancel()
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
pending.respond(nil, err)
Expand Down
2 changes: 1 addition & 1 deletion nomad/plan_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (q *PlanQueue) Stats() *QueueStats {
}

// EmitStats is used to export metrics about the broker while enabled
func (q *PlanQueue) EmitStats(period time.Duration, stopCh chan struct{}) {
func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):
Expand Down
11 changes: 8 additions & 3 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,11 @@ type Server struct {

left bool
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex

shutdownCtx context.Context
shutdownCancel context.CancelFunc
shutdownCh <-chan struct{}
}

// Holds the RPC endpoints
Expand Down Expand Up @@ -303,9 +306,11 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
shutdownCh: make(chan struct{}),
}

s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
s.shutdownCh = s.shutdownCtx.Done()

// Create the RPC handler
s.rpcHandler = newRpcHandler(s)

Expand Down Expand Up @@ -530,7 +535,7 @@ func (s *Server) Shutdown() error {
}

s.shutdown = true
close(s.shutdownCh)
s.shutdownCancel()

if s.serf != nil {
s.serf.Shutdown()
Expand Down
62 changes: 0 additions & 62 deletions nomad/state/notify.go

This file was deleted.

72 changes: 0 additions & 72 deletions nomad/state/notify_test.go

This file was deleted.

55 changes: 55 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,61 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
return snap, nil
}

// SnapshotAfter is used to create a point in time snapshot where the index is
// guaranteed to be greater than or equal to the index parameter.
//
// Some server operations (such as scheduling) exchange objects via RPC
// concurrent with Raft log application, so they must ensure the state store
// snapshot they are operating on is at or after the index the objects
// retrieved via RPC were applied to the Raft log at.
//
// Callers should maintain their own timer metric as the time this method
// blocks indicates Raft log application latency relative to scheduling.
func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) {
// Ported from work.go:waitForIndex prior to 0.9

const backoffBase = 20 * time.Millisecond
const backoffLimit = 1 * time.Second
var retries uint
var retryTimer *time.Timer

// XXX: Potential optimization is to set up a watch on the state
// store's index table and only unblock via a trigger rather than
// polling.
for {
// Get the states current index
snapshotIndex, err := s.LatestIndex()
if err != nil {
return nil, fmt.Errorf("failed to determine state store's index: %v", err)
}

// We only need the FSM state to be as recent as the given index
if snapshotIndex >= index {
return s.Snapshot()
}

// Exponential back off
retries++
if retryTimer == nil {
// First retry, start at baseline
retryTimer = time.NewTimer(backoffBase)
} else {
// Subsequent retry, reset timer
deadline := 1 << (2 * retries) * backoffBase
if deadline > backoffLimit {
deadline = backoffLimit
}
retryTimer.Reset(deadline)
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-retryTimer.C:
}
}
}

// Restore is used to optimize the efficiency of rebuilding
// state by minimizing the number of transactions and checking
// overhead.
Expand Down
94 changes: 94 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7118,6 +7118,100 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi
require.Nil(denormalizedAllocs)
}

// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks
// until the StateStore's latest index is >= the requested index.
func TestStateStore_SnapshotAfter_OK(t *testing.T) {
t.Parallel()

s := testStateStore(t)
index, err := s.LatestIndex()
require.NoError(t, err)

node := mock.Node()
require.NoError(t, s.UpsertNode(index+1, node))

// Assert SnapshotAfter returns immediately if index < latest index
ctx, cancel := context.WithTimeout(context.Background(), 0)
snap, err := s.SnapshotAfter(ctx, index)
cancel()
require.NoError(t, err)

snapIndex, err := snap.LatestIndex()
require.NoError(t, err)
if snapIndex <= index {
require.Fail(t, "snapshot index should be greater than index")
}

// Assert SnapshotAfter returns immediately if index == latest index
ctx, cancel = context.WithTimeout(context.Background(), 0)
snap, err = s.SnapshotAfter(ctx, index+1)
cancel()
require.NoError(t, err)

snapIndex, err = snap.LatestIndex()
require.NoError(t, err)
require.Equal(t, snapIndex, index+1)

// Assert SnapshotAfter blocks if index > latest index
errCh := make(chan error, 1)
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
defer close(errCh)
waitIndex := index + 2
snap, err := s.SnapshotAfter(ctx, waitIndex)
if err != nil {
errCh <- err
return
}

snapIndex, err := snap.LatestIndex()
if err != nil {
errCh <- err
return
}

if snapIndex < waitIndex {
errCh <- fmt.Errorf("snapshot index < wait index: %d < %d", snapIndex, waitIndex)
return
}
}()

select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(500 * time.Millisecond):
// Let it block for a bit before unblocking by upserting
}

node.Name = "hal"
require.NoError(t, s.UpsertNode(index+2, node))

select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(5 * time.Second):
require.Fail(t, "timed out waiting for SnapshotAfter to unblock")
}
}

// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter
// returns an error if the desired index is not reached within the deadline.
func TestStateStore_SnapshotAfter_Timeout(t *testing.T) {
t.Parallel()

s := testStateStore(t)
index, err := s.LatestIndex()
require.NoError(t, err)

// Assert SnapshotAfter blocks if index > latest index
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
snap, err := s.SnapshotAfter(ctx, index+1)
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, snap)
}

// watchFired is a helper for unit tests that returns if the given watch set
// fired (it doesn't care which watch actually fired). This uses a fixed
// timeout since we already expect the event happened before calling this and
Expand Down
Loading

0 comments on commit 59946ff

Please sign in to comment.