Skip to content

Commit

Permalink
nomad: SnapshotAfter -> SnapshotMinIndex
Browse files Browse the repository at this point in the history
Rename SnapshotAfter to SnapshotMinIndex. The old name was not
technically accurate. SnapshotAtOrAfter is more accurate, but wordy and
still lacks context about what precisely it is at or after (the index).

SnapshotMinIndex was chosen as it describes the action (snapshot), a
constraint (minimum), and the object of the constraint (index).
  • Loading branch information
schmichael committed Jun 24, 2019
1 parent 153ff1e commit 31ea934
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 23 deletions.
2 changes: 1 addition & 1 deletion nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64

const timeout = 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err := p.fsm.State().SnapshotAfter(ctx, minIndex)
snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex)
cancel()
if err == context.DeadlineExceeded {
return nil, fmt.Errorf("timed out after %s waiting for index=%d (previous plan result index=%d; plan snapshot index=%d)",
Expand Down
4 changes: 2 additions & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
return snap, nil
}

// SnapshotAfter is used to create a point in time snapshot where the index is
// SnapshotMinIndex is used to create a state snapshot where the index is
// guaranteed to be greater than or equal to the index parameter.
//
// Some server operations (such as scheduling) exchange objects via RPC
Expand All @@ -111,7 +111,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
//
// Callers should maintain their own timer metric as the time this method
// blocks indicates Raft log application latency relative to scheduling.
func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) {
func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*StateSnapshot, error) {
// Ported from work.go:waitForIndex prior to 0.9

const backoffBase = 20 * time.Millisecond
Expand Down
26 changes: 13 additions & 13 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7138,9 +7138,9 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi
require.Nil(denormalizedAllocs)
}

// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks
// TestStateStore_SnapshotMinIndex_OK asserts StateStore.SnapshotMinIndex blocks
// until the StateStore's latest index is >= the requested index.
func TestStateStore_SnapshotAfter_OK(t *testing.T) {
func TestStateStore_SnapshotMinIndex_OK(t *testing.T) {
t.Parallel()

s := testStateStore(t)
Expand All @@ -7150,9 +7150,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
node := mock.Node()
require.NoError(t, s.UpsertNode(index+1, node))

// Assert SnapshotAfter returns immediately if index < latest index
// Assert SnapshotMinIndex returns immediately if index < latest index
ctx, cancel := context.WithTimeout(context.Background(), 0)
snap, err := s.SnapshotAfter(ctx, index)
snap, err := s.SnapshotMinIndex(ctx, index)
cancel()
require.NoError(t, err)

Expand All @@ -7162,24 +7162,24 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
require.Fail(t, "snapshot index should be greater than index")
}

// Assert SnapshotAfter returns immediately if index == latest index
// Assert SnapshotMinIndex returns immediately if index == latest index
ctx, cancel = context.WithTimeout(context.Background(), 0)
snap, err = s.SnapshotAfter(ctx, index+1)
snap, err = s.SnapshotMinIndex(ctx, index+1)
cancel()
require.NoError(t, err)

snapIndex, err = snap.LatestIndex()
require.NoError(t, err)
require.Equal(t, snapIndex, index+1)

// Assert SnapshotAfter blocks if index > latest index
// Assert SnapshotMinIndex blocks if index > latest index
errCh := make(chan error, 1)
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
defer close(errCh)
waitIndex := index + 2
snap, err := s.SnapshotAfter(ctx, waitIndex)
snap, err := s.SnapshotMinIndex(ctx, waitIndex)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -7211,23 +7211,23 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(5 * time.Second):
require.Fail(t, "timed out waiting for SnapshotAfter to unblock")
require.Fail(t, "timed out waiting for SnapshotMinIndex to unblock")
}
}

// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter
// TestStateStore_SnapshotMinIndex_Timeout asserts StateStore.SnapshotMinIndex
// returns an error if the desired index is not reached within the deadline.
func TestStateStore_SnapshotAfter_Timeout(t *testing.T) {
func TestStateStore_SnapshotMinIndex_Timeout(t *testing.T) {
t.Parallel()

s := testStateStore(t)
index, err := s.LatestIndex()
require.NoError(t, err)

// Assert SnapshotAfter blocks if index > latest index
// Assert SnapshotMinIndex blocks if index > latest index
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
snap, err := s.SnapshotAfter(ctx, index+1)
snap, err := s.SnapshotMinIndex(ctx, index+1)
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, snap)
}
Expand Down
10 changes: 5 additions & 5 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (w *Worker) run() {
}

// Wait for the raft log to catchup to the evaluation
snap, err := w.snapshotAfter(waitIndex, raftSyncLimit)
snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit)
if err != nil {
w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex)
w.sendAck(eval.ID, token, false)
Expand Down Expand Up @@ -224,11 +224,11 @@ func (w *Worker) sendAck(evalID, token string, ack bool) {
}
}

// snapshotAfter times calls to StateStore.SnapshotAfter which may block.
func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
// snapshotMinIndex times calls to StateStore.SnapshotAfter which may block.
func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout)
snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex)
snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex)
cancel()
metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)

Expand Down Expand Up @@ -332,7 +332,7 @@ SUBMIT:
w.logger.Debug("refreshing state", "refresh_index", result.RefreshIndex, "eval_id", plan.EvalID)

var err error
state, err = w.snapshotAfter(result.RefreshIndex, raftSyncLimit)
state, err = w.snapshotMinIndex(result.RefreshIndex, raftSyncLimit)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestWorker_waitForIndex(t *testing.T) {

// Wait for a future index
w := &Worker{srv: s1, logger: s1.logger}
snap, err := w.snapshotAfter(index+1, time.Second)
snap, err := w.snapshotMinIndex(index+1, time.Second)
require.NoError(t, err)
require.NotNil(t, snap)

Expand All @@ -306,7 +306,7 @@ func TestWorker_waitForIndex(t *testing.T) {
// Cause a timeout
waitIndex := index + 100
timeout := 10 * time.Millisecond
snap, err = w.snapshotAfter(index+100, timeout)
snap, err = w.snapshotMinIndex(index+100, timeout)
require.Nil(t, snap)
require.EqualError(t, err,
fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex))
Expand Down

0 comments on commit 31ea934

Please sign in to comment.