From 633b4cab531a3b3b8afc6d86184ddc5a4eb6f63d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 29 Oct 2021 12:38:09 -0400 Subject: [PATCH] ignore validity window during wal replay (#4596) --- pkg/ingester/stream.go | 5 +++-- pkg/ingester/stream_test.go | 45 +++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 3ad77722a526..aa2d7e730bb4 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -182,7 +182,8 @@ func (s *stream) Push( s.chunkMtx.Lock() defer s.chunkMtx.Unlock() - if counter > 0 && counter <= s.entryCt { + isReplay := counter > 0 + if isReplay && counter <= s.entryCt { var byteCt int for _, e := range entries { byteCt += len(e.Line) @@ -256,7 +257,7 @@ func (s *stream) Push( } // The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age. - if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) { + if !isReplay && s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind}) outOfOrderSamples++ outOfOrderBytes += len(entries[i].Line) diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index bcc7470a593a..dbfb5cf04334 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -339,6 +339,51 @@ func TestPushRateLimit(t *testing.T) { require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[1].Line))}).Error()) } +func TestReplayAppendIgnoresValidityWindow(t *testing.T) { + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + + cfg := defaultConfig() + cfg.MaxChunkAge = time.Minute + + s := newStream( + cfg, + limiter, + "fake", + model.Fingerprint(0), + labels.Labels{ + {Name: "foo", Value: "bar"}, + }, + true, + NilMetrics, + ) + + base := time.Now() + + entries := []logproto.Entry{ + {Timestamp: base, Line: "1"}, + } + + // Push a first entry (it doesn't matter if we look like we're replaying or not) + _, err = s.Push(context.Background(), entries, nil, 1) + require.Nil(t, err) + + // Create a sample outside the validity window + entries = []logproto.Entry{ + {Timestamp: base.Add(-time.Hour), Line: "2"}, + } + + // Pretend it's not a replay, ensure we error + _, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0) + require.NotNil(t, err) + + // Now pretend it's a replay. The same write should succeed. + _, err = s.Push(context.Background(), entries, nil, 2) + require.Nil(t, err) + +} + func iterEq(t *testing.T, exp []logproto.Entry, got iter.EntryIterator) { var i int for got.Next() {