From 31ea9343d06d720888cebd8a13063618baa4b551 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 24 Jun 2019 11:59:44 -0700 Subject: [PATCH] nomad: SnapshotAfter -> SnapshotMinIndex Rename SnapshotAfter to SnapshotMinIndex. The old name was not technically accurate. SnapshotAtOrAfter is more accurate, but wordy and still lacks context about what precisely it is at or after (the index). SnapshotMinIndex was chosen as it describes the action (snapshot), a constraint (minimum), and the object of the constraint (index). --- nomad/plan_apply.go | 2 +- nomad/state/state_store.go | 4 ++-- nomad/state/state_store_test.go | 26 +++++++++++++------------- nomad/worker.go | 10 +++++----- nomad/worker_test.go | 4 ++-- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 6968e7eda457..4d9fccc08556 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -184,7 +184,7 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64 const timeout = 5 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) - snap, err := p.fsm.State().SnapshotAfter(ctx, minIndex) + snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex) cancel() if err == context.DeadlineExceeded { return nil, fmt.Errorf("timed out after %s waiting for index=%d (previous plan result index=%d; plan snapshot index=%d)", diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 09c74286400a..af522d4483bd 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -101,7 +101,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return snap, nil } -// SnapshotAfter is used to create a point in time snapshot where the index is +// SnapshotMinIndex is used to create a state snapshot where the index is // guaranteed to be greater than or equal to the index parameter. // // Some server operations (such as scheduling) exchange objects via RPC @@ -111,7 +111,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { // // Callers should maintain their own timer metric as the time this method // blocks indicates Raft log application latency relative to scheduling. -func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) { +func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*StateSnapshot, error) { // Ported from work.go:waitForIndex prior to 0.9 const backoffBase = 20 * time.Millisecond diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index d21e4bbdefbc..4aa6e949241d 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7138,9 +7138,9 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi require.Nil(denormalizedAllocs) } -// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks +// TestStateStore_SnapshotMinIndex_OK asserts StateStore.SnapshotMinIndex blocks // until the StateStore's latest index is >= the requested index. -func TestStateStore_SnapshotAfter_OK(t *testing.T) { +func TestStateStore_SnapshotMinIndex_OK(t *testing.T) { t.Parallel() s := testStateStore(t) @@ -7150,9 +7150,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { node := mock.Node() require.NoError(t, s.UpsertNode(index+1, node)) - // Assert SnapshotAfter returns immediately if index < latest index + // Assert SnapshotMinIndex returns immediately if index < latest index ctx, cancel := context.WithTimeout(context.Background(), 0) - snap, err := s.SnapshotAfter(ctx, index) + snap, err := s.SnapshotMinIndex(ctx, index) cancel() require.NoError(t, err) @@ -7162,9 +7162,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { require.Fail(t, "snapshot index should be greater than index") } - // Assert SnapshotAfter returns immediately if index == latest index + // Assert SnapshotMinIndex returns immediately if index == latest index ctx, cancel = context.WithTimeout(context.Background(), 0) - snap, err = s.SnapshotAfter(ctx, index+1) + snap, err = s.SnapshotMinIndex(ctx, index+1) cancel() require.NoError(t, err) @@ -7172,14 +7172,14 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { require.NoError(t, err) require.Equal(t, snapIndex, index+1) - // Assert SnapshotAfter blocks if index > latest index + // Assert SnapshotMinIndex blocks if index > latest index errCh := make(chan error, 1) ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() go func() { defer close(errCh) waitIndex := index + 2 - snap, err := s.SnapshotAfter(ctx, waitIndex) + snap, err := s.SnapshotMinIndex(ctx, waitIndex) if err != nil { errCh <- err return @@ -7211,23 +7211,23 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { case err := <-errCh: require.NoError(t, err) case <-time.After(5 * time.Second): - require.Fail(t, "timed out waiting for SnapshotAfter to unblock") + require.Fail(t, "timed out waiting for SnapshotMinIndex to unblock") } } -// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter +// TestStateStore_SnapshotMinIndex_Timeout asserts StateStore.SnapshotMinIndex // returns an error if the desired index is not reached within the deadline. -func TestStateStore_SnapshotAfter_Timeout(t *testing.T) { +func TestStateStore_SnapshotMinIndex_Timeout(t *testing.T) { t.Parallel() s := testStateStore(t) index, err := s.LatestIndex() require.NoError(t, err) - // Assert SnapshotAfter blocks if index > latest index + // Assert SnapshotMinIndex blocks if index > latest index ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - snap, err := s.SnapshotAfter(ctx, index+1) + snap, err := s.SnapshotMinIndex(ctx, index+1) require.EqualError(t, err, context.DeadlineExceeded.Error()) require.Nil(t, snap) } diff --git a/nomad/worker.go b/nomad/worker.go index b29fa49ed5a3..06a8dc34db16 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -118,7 +118,7 @@ func (w *Worker) run() { } // Wait for the raft log to catchup to the evaluation - snap, err := w.snapshotAfter(waitIndex, raftSyncLimit) + snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit) if err != nil { w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) w.sendAck(eval.ID, token, false) @@ -224,11 +224,11 @@ func (w *Worker) sendAck(evalID, token string, ack bool) { } } -// snapshotAfter times calls to StateStore.SnapshotAfter which may block. -func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { +// snapshotMinIndex times calls to StateStore.SnapshotAfter which may block. +func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { start := time.Now() ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout) - snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex) + snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex) cancel() metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) @@ -332,7 +332,7 @@ SUBMIT: w.logger.Debug("refreshing state", "refresh_index", result.RefreshIndex, "eval_id", plan.EvalID) var err error - state, err = w.snapshotAfter(result.RefreshIndex, raftSyncLimit) + state, err = w.snapshotMinIndex(result.RefreshIndex, raftSyncLimit) if err != nil { return nil, nil, err } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 70b0a1c99150..4be5041daefa 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -296,7 +296,7 @@ func TestWorker_waitForIndex(t *testing.T) { // Wait for a future index w := &Worker{srv: s1, logger: s1.logger} - snap, err := w.snapshotAfter(index+1, time.Second) + snap, err := w.snapshotMinIndex(index+1, time.Second) require.NoError(t, err) require.NotNil(t, snap) @@ -306,7 +306,7 @@ func TestWorker_waitForIndex(t *testing.T) { // Cause a timeout waitIndex := index + 100 timeout := 10 * time.Millisecond - snap, err = w.snapshotAfter(index+100, timeout) + snap, err = w.snapshotMinIndex(index+100, timeout) require.Nil(t, snap) require.EqualError(t, err, fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex))