diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 6968e7eda457..4d9fccc08556 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -184,7 +184,7 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64 const timeout = 5 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) - snap, err := p.fsm.State().SnapshotAfter(ctx, minIndex) + snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex) cancel() if err == context.DeadlineExceeded { return nil, fmt.Errorf("timed out after %s waiting for index=%d (previous plan result index=%d; plan snapshot index=%d)", diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 09c74286400a..af522d4483bd 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -101,7 +101,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return snap, nil } -// SnapshotAfter is used to create a point in time snapshot where the index is +// SnapshotMinIndex is used to create a state snapshot where the index is // guaranteed to be greater than or equal to the index parameter. // // Some server operations (such as scheduling) exchange objects via RPC @@ -111,7 +111,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { // // Callers should maintain their own timer metric as the time this method // blocks indicates Raft log application latency relative to scheduling. -func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) { +func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*StateSnapshot, error) { // Ported from work.go:waitForIndex prior to 0.9 const backoffBase = 20 * time.Millisecond diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index d21e4bbdefbc..4aa6e949241d 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7138,9 +7138,9 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi require.Nil(denormalizedAllocs) } -// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks +// TestStateStore_SnapshotMinIndex_OK asserts StateStore.SnapshotMinIndex blocks // until the StateStore's latest index is >= the requested index. -func TestStateStore_SnapshotAfter_OK(t *testing.T) { +func TestStateStore_SnapshotMinIndex_OK(t *testing.T) { t.Parallel() s := testStateStore(t) @@ -7150,9 +7150,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { node := mock.Node() require.NoError(t, s.UpsertNode(index+1, node)) - // Assert SnapshotAfter returns immediately if index < latest index + // Assert SnapshotMinIndex returns immediately if index < latest index ctx, cancel := context.WithTimeout(context.Background(), 0) - snap, err := s.SnapshotAfter(ctx, index) + snap, err := s.SnapshotMinIndex(ctx, index) cancel() require.NoError(t, err) @@ -7162,9 +7162,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { require.Fail(t, "snapshot index should be greater than index") } - // Assert SnapshotAfter returns immediately if index == latest index + // Assert SnapshotMinIndex returns immediately if index == latest index ctx, cancel = context.WithTimeout(context.Background(), 0) - snap, err = s.SnapshotAfter(ctx, index+1) + snap, err = s.SnapshotMinIndex(ctx, index+1) cancel() require.NoError(t, err) @@ -7172,14 +7172,14 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { require.NoError(t, err) require.Equal(t, snapIndex, index+1) - // Assert SnapshotAfter blocks if index > latest index + // Assert SnapshotMinIndex blocks if index > latest index errCh := make(chan error, 1) ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() go func() { defer close(errCh) waitIndex := index + 2 - snap, err := s.SnapshotAfter(ctx, waitIndex) + snap, err := s.SnapshotMinIndex(ctx, waitIndex) if err != nil { errCh <- err return @@ -7211,23 +7211,23 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) { case err := <-errCh: require.NoError(t, err) case <-time.After(5 * time.Second): - require.Fail(t, "timed out waiting for SnapshotAfter to unblock") + require.Fail(t, "timed out waiting for SnapshotMinIndex to unblock") } } -// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter +// TestStateStore_SnapshotMinIndex_Timeout asserts StateStore.SnapshotMinIndex // returns an error if the desired index is not reached within the deadline. -func TestStateStore_SnapshotAfter_Timeout(t *testing.T) { +func TestStateStore_SnapshotMinIndex_Timeout(t *testing.T) { t.Parallel() s := testStateStore(t) index, err := s.LatestIndex() require.NoError(t, err) - // Assert SnapshotAfter blocks if index > latest index + // Assert SnapshotMinIndex blocks if index > latest index ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - snap, err := s.SnapshotAfter(ctx, index+1) + snap, err := s.SnapshotMinIndex(ctx, index+1) require.EqualError(t, err, context.DeadlineExceeded.Error()) require.Nil(t, snap) } diff --git a/nomad/worker.go b/nomad/worker.go index b29fa49ed5a3..06a8dc34db16 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -118,7 +118,7 @@ func (w *Worker) run() { } // Wait for the raft log to catchup to the evaluation - snap, err := w.snapshotAfter(waitIndex, raftSyncLimit) + snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit) if err != nil { w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) w.sendAck(eval.ID, token, false) @@ -224,11 +224,11 @@ func (w *Worker) sendAck(evalID, token string, ack bool) { } } -// snapshotAfter times calls to StateStore.SnapshotAfter which may block. -func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { +// snapshotMinIndex times calls to StateStore.SnapshotAfter which may block. +func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { start := time.Now() ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout) - snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex) + snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex) cancel() metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) @@ -332,7 +332,7 @@ SUBMIT: w.logger.Debug("refreshing state", "refresh_index", result.RefreshIndex, "eval_id", plan.EvalID) var err error - state, err = w.snapshotAfter(result.RefreshIndex, raftSyncLimit) + state, err = w.snapshotMinIndex(result.RefreshIndex, raftSyncLimit) if err != nil { return nil, nil, err } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 70b0a1c99150..4be5041daefa 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -296,7 +296,7 @@ func TestWorker_waitForIndex(t *testing.T) { // Wait for a future index w := &Worker{srv: s1, logger: s1.logger} - snap, err := w.snapshotAfter(index+1, time.Second) + snap, err := w.snapshotMinIndex(index+1, time.Second) require.NoError(t, err) require.NotNil(t, snap) @@ -306,7 +306,7 @@ func TestWorker_waitForIndex(t *testing.T) { // Cause a timeout waitIndex := index + 100 timeout := 10 * time.Millisecond - snap, err = w.snapshotAfter(index+100, timeout) + snap, err = w.snapshotMinIndex(index+100, timeout) require.Nil(t, snap) require.EqualError(t, err, fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex))