Skip to content

Commit

Permalink
ignore validity window during wal replay (#4596)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Oct 29, 2021
1 parent 45113cb commit 633b4ca
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
5 changes: 3 additions & 2 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ func (s *stream) Push(
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()

if counter > 0 && counter <= s.entryCt {
isReplay := counter > 0
if isReplay && counter <= s.entryCt {
var byteCt int
for _, e := range entries {
byteCt += len(e.Line)
Expand Down Expand Up @@ -256,7 +257,7 @@ func (s *stream) Push(
}

// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
if !isReplay && s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind})
outOfOrderSamples++
outOfOrderBytes += len(entries[i].Line)
Expand Down
45 changes: 45 additions & 0 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,51 @@ func TestPushRateLimit(t *testing.T) {
require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[1].Line))}).Error())
}

func TestReplayAppendIgnoresValidityWindow(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

cfg := defaultConfig()
cfg.MaxChunkAge = time.Minute

s := newStream(
cfg,
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NilMetrics,
)

base := time.Now()

entries := []logproto.Entry{
{Timestamp: base, Line: "1"},
}

// Push a first entry (it doesn't matter if we look like we're replaying or not)
_, err = s.Push(context.Background(), entries, nil, 1)
require.Nil(t, err)

// Create a sample outside the validity window
entries = []logproto.Entry{
{Timestamp: base.Add(-time.Hour), Line: "2"},
}

// Pretend it's not a replay, ensure we error
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0)
require.NotNil(t, err)

// Now pretend it's a replay. The same write should succeed.
_, err = s.Push(context.Background(), entries, nil, 2)
require.Nil(t, err)

}

func iterEq(t *testing.T, exp []logproto.Entry, got iter.EntryIterator) {
var i int
for got.Next() {
Expand Down

0 comments on commit 633b4ca

Please sign in to comment.