diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 362df2c89338..236290151a3d 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -44,10 +44,34 @@ const ( defaultBlockSize = 256 * 1024 ) +var ( + HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt} +) + type HeadBlockFmt byte func (f HeadBlockFmt) Byte() byte { return byte(f) } +func (f HeadBlockFmt) String() string { + switch { + case f < UnorderedHeadBlockFmt: + return "ordered" + case f == UnorderedHeadBlockFmt: + return "unordered" + default: + return fmt.Sprintf("unknown: %v", byte(f)) + } +} + +func (f HeadBlockFmt) NewBlock() HeadBlock { + switch { + case f < UnorderedHeadBlockFmt: + return &headBlock{} + default: + return newUnorderedHeadBlock() + } +} + const ( _ HeadBlockFmt = iota // placeholders to start splitting chunk formats vs head block @@ -93,6 +117,7 @@ type MemChunk struct { // the chunk format default to v2 format byte encoding Encoding + headFmt HeadBlockFmt } type block struct { @@ -309,19 +334,18 @@ type entry struct { } // NewMemChunk returns a new in-mem chunk. -func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk { - c := &MemChunk{ +func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { + return &MemChunk{ blockSize: blockSize, // The blockSize in bytes. targetSize: targetSize, // Desired chunk size in compressed bytes blocks: []block{}, - head: &headBlock{}, format: DefaultChunkFormat, + head: head.NewBlock(), encoding: enc, + headFmt: head, } - - return c } // NewByteChunk returns a MemChunk on the passed bytes. @@ -563,12 +587,19 @@ func (c *MemChunk) CheckpointSize() (chunk, head int) { return c.BytesSize(), c.head.CheckpointSize() } -func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) { +func MemchunkFromCheckpoint(chk, head []byte, desired HeadBlockFmt, blockSize int, targetSize int) (*MemChunk, error) { mc, err := NewByteChunk(chk, blockSize, targetSize) if err != nil { return nil, err } - return mc, mc.head.LoadBytes(head) + h, err := HeadFromCheckpoint(head, desired) + if err != nil { + return nil, err + } + + mc.head = h + mc.headFmt = desired + return mc, nil } // Encoding implements Chunk. @@ -642,7 +673,7 @@ func (c *MemChunk) Append(entry *logproto.Entry) error { // If the head block is empty but there are cut blocks, we have to make // sure the new entry is not out of order compared to the previous block - if c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp { + if c.headFmt < UnorderedHeadBlockFmt && c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp { return ErrOutOfOrder } @@ -660,7 +691,37 @@ func (c *MemChunk) Append(entry *logproto.Entry) error { // Close implements Chunk. // TODO: Fix this to check edge cases. func (c *MemChunk) Close() error { - return c.cut() + if err := c.cut(); err != nil { + return err + } + return c.reorder() +} + +// reorder ensures all blocks in a chunk are in +// monotonically increasing order. +// This mutates +func (c *MemChunk) reorder() error { + var lastMax int64 // placeholder to check order across blocks + ordered := true + for _, b := range c.blocks { + if b.mint < lastMax { + ordered = false + } + lastMax = b.maxt + } + + if ordered { + return nil + } + + // Otherwise, we need to rebuild the blocks + from, to := c.Bounds() + newC, err := c.Rebound(from, to) + if err != nil { + return err + } + *c = *newC.(*MemChunk) + return nil } // cut a new block and add it to finished blocks. @@ -712,14 +773,28 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi blockItrs := make([]iter.EntryIterator, 0, len(c.blocks)+1) var headIterator iter.EntryIterator + var lastMax int64 // placeholder to check order across blocks + ordered := true for _, b := range c.blocks { + + // skip this block if maxt < b.mint || b.maxt < mint { continue } + + if b.mint < lastMax { + ordered = false + } + lastMax = b.maxt + blockItrs = append(blockItrs, encBlock{c.encoding, b}.Iterator(ctx, pipeline)) } if !c.head.IsEmpty() { + from, _ := c.head.Bounds() + if from < lastMax { + ordered = false + } headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline) } @@ -728,8 +803,16 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi if headIterator != nil { blockItrs = append(blockItrs, headIterator) } + + var it iter.EntryIterator + if ordered { + it = iter.NewNonOverlappingIterator(blockItrs, "") + } else { + it = iter.NewHeapIterator(ctx, blockItrs, direction) + } + return iter.NewTimeRangedIterator( - iter.NewNonOverlappingIterator(blockItrs, ""), + it, time.Unix(0, mint), time.Unix(0, maxt), ), nil @@ -755,7 +838,11 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi blockItrs[i], blockItrs[j] = blockItrs[j], blockItrs[i] } - return iter.NewNonOverlappingIterator(blockItrs, ""), nil + if ordered { + return iter.NewNonOverlappingIterator(blockItrs, ""), nil + } + return iter.NewHeapIterator(ctx, blockItrs, direction), nil + } // Iterator implements Chunk. @@ -763,19 +850,38 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, mint, maxt := from.UnixNano(), through.UnixNano() its := make([]iter.SampleIterator, 0, len(c.blocks)+1) + var lastMax int64 // placeholder to check order across blocks + ordered := true for _, b := range c.blocks { + // skip this block if maxt < b.mint || b.maxt < mint { continue } + + if b.mint < lastMax { + ordered = false + } + lastMax = b.maxt its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor)) } if !c.head.IsEmpty() { + from, _ := c.head.Bounds() + if from < lastMax { + ordered = false + } its = append(its, c.head.SampleIterator(ctx, mint, maxt, extractor)) } + var it iter.SampleIterator + if ordered { + it = iter.NewNonOverlappingSampleIterator(its, "") + } else { + it = iter.NewHeapSampleIterator(ctx, its) + } + return iter.NewTimeRangedSampleIterator( - iter.NewNonOverlappingSampleIterator(its, ""), + it, mint, maxt, ) @@ -802,10 +908,18 @@ func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) { return nil, err } - // Using defaultBlockSize for target block size. - // The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity. - // For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that. - newChunk := NewMemChunk(c.Encoding(), defaultBlockSize, c.CompressedSize()) + var newChunk *MemChunk + // as close as possible, respect the block/target sizes specified. However, + // if the blockSize is not set, use reasonable defaults. + if c.blockSize > 0 { + newChunk = NewMemChunk(c.Encoding(), c.headFmt, c.blockSize, c.targetSize) + } else { + // Using defaultBlockSize for target block size. + // The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity. + // For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that. + newChunk = NewMemChunk(c.Encoding(), c.headFmt, defaultBlockSize, c.CompressedSize()) + + } for itr.Next() { entry := itr.Entry() diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index fb6173f5fca2..0d3b8b94a796 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -51,8 +51,10 @@ var ( }() ) +const DefaultHeadBlockFmt = OrderedHeadBlockFmt + func TestBlocksInclusive(t *testing.T) { - chk := NewMemChunk(EncNone, testBlockSize, testTargetSize) + chk := NewMemChunk(EncNone, DefaultHeadBlockFmt, testBlockSize, testTargetSize) err := chk.Append(logprotoEntry(1, "1")) require.Nil(t, err) err = chk.cut() @@ -68,7 +70,7 @@ func TestBlock(t *testing.T) { t.Run(enc.String(), func(t *testing.T) { t.Parallel() - chk := NewMemChunk(enc, testBlockSize, testTargetSize) + chk := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize) cases := []struct { ts int64 str string @@ -178,7 +180,7 @@ func TestBlock(t *testing.T) { func TestReadFormatV1(t *testing.T) { t.Parallel() - c := NewMemChunk(EncGZIP, testBlockSize, testTargetSize) + c := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, testBlockSize, testTargetSize) fillChunk(c) // overrides default v2 format c.format = chunkFormatV1 @@ -213,179 +215,184 @@ func TestReadFormatV1(t *testing.T) { // 1) memory populated chunks <-> []byte loaded chunks // 2) []byte loaded chunks <-> []byte loaded chunks func TestRoundtripV2(t *testing.T) { - for _, enc := range testEncoding { - for _, version := range []byte{chunkFormatV2, chunkFormatV3} { - t.Run(enc.String(), func(t *testing.T) { - t.Parallel() + for _, f := range HeadBlockFmts { + for _, enc := range testEncoding { + for _, version := range []byte{chunkFormatV2, chunkFormatV3} { + t.Run(enc.String(), func(t *testing.T) { + t.Parallel() + + c := NewMemChunk(enc, f, testBlockSize, testTargetSize) + c.format = version + populated := fillChunk(c) + + assertLines := func(c *MemChunk) { + require.Equal(t, enc, c.Encoding()) + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) + if err != nil { + t.Fatal(err) + } + + i := int64(0) + var data int64 + for it.Next() { + require.Equal(t, i, it.Entry().Timestamp.UnixNano()) + require.Equal(t, testdata.LogString(i), it.Entry().Line) + + data += int64(len(it.Entry().Line)) + i++ + } + require.Equal(t, populated, data) + } - c := NewMemChunk(enc, testBlockSize, testTargetSize) - c.format = version - populated := fillChunk(c) + assertLines(c) - assertLines := func(c *MemChunk) { - require.Equal(t, enc, c.Encoding()) - it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) + // test MemChunk -> NewByteChunk loading + b, err := c.Bytes() if err != nil { t.Fatal(err) } - i := int64(0) - var data int64 - for it.Next() { - require.Equal(t, i, it.Entry().Timestamp.UnixNano()) - require.Equal(t, testdata.LogString(i), it.Entry().Line) - - data += int64(len(it.Entry().Line)) - i++ + r, err := NewByteChunk(b, testBlockSize, testTargetSize) + if err != nil { + t.Fatal(err) } - require.Equal(t, populated, data) - } - - assertLines(c) - - // test MemChunk -> NewByteChunk loading - b, err := c.Bytes() - if err != nil { - t.Fatal(err) - } - - r, err := NewByteChunk(b, testBlockSize, testTargetSize) - if err != nil { - t.Fatal(err) - } - assertLines(r) + assertLines(r) - // test NewByteChunk -> NewByteChunk loading - rOut, err := r.Bytes() - require.Nil(t, err) + // test NewByteChunk -> NewByteChunk loading + rOut, err := r.Bytes() + require.Nil(t, err) - loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize) - require.Nil(t, err) + loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize) + require.Nil(t, err) - assertLines(loaded) - }) + assertLines(loaded) + }) + } } } } func TestRoundtripV3(t *testing.T) { - for _, enc := range testEncoding { - t.Run(enc.String(), func(t *testing.T) { - t.Parallel() - - c := NewMemChunk(enc, testBlockSize, testTargetSize) - c.format = chunkFormatV3 - _ = fillChunk(c) + for _, f := range HeadBlockFmts { + for _, enc := range testEncoding { + t.Run(fmt.Sprintf("%v-%v", f, enc), func(t *testing.T) { + t.Parallel() - b, err := c.Bytes() - require.Nil(t, err) - r, err := NewByteChunk(b, testBlockSize, testTargetSize) - require.Nil(t, err) + c := NewMemChunk(enc, f, testBlockSize, testTargetSize) + c.format = chunkFormatV3 + _ = fillChunk(c) - // have to populate then clear the head block or fail comparing against nil vs zero values - err = r.head.Append(1, "1") - require.Nil(t, err) - r.head.Reset() + b, err := c.Bytes() + require.Nil(t, err) + r, err := NewByteChunk(b, testBlockSize, testTargetSize) + require.Nil(t, err) - require.Equal(t, c, r) - }) + b2, err := r.Bytes() + require.Nil(t, err) + require.Equal(t, b, b2) + }) + } } } func TestSerialization(t *testing.T) { - for _, enc := range testEncoding { - t.Run(enc.String(), func(t *testing.T) { - t.Parallel() + for _, f := range HeadBlockFmts { + for _, enc := range testEncoding { + t.Run(enc.String(), func(t *testing.T) { + t.Parallel() - chk := NewMemChunk(enc, testBlockSize, testTargetSize) + chk := NewMemChunk(enc, f, testBlockSize, testTargetSize) - numSamples := 50000 + numSamples := 50000 - for i := 0; i < numSamples; i++ { - require.NoError(t, chk.Append(logprotoEntry(int64(i), strconv.Itoa(i)))) - } - require.NoError(t, chk.Close()) + for i := 0; i < numSamples; i++ { + require.NoError(t, chk.Append(logprotoEntry(int64(i), strconv.Itoa(i)))) + } + require.NoError(t, chk.Close()) - byt, err := chk.Bytes() - require.NoError(t, err) + byt, err := chk.Bytes() + require.NoError(t, err) - bc, err := NewByteChunk(byt, testBlockSize, testTargetSize) - require.NoError(t, err) + bc, err := NewByteChunk(byt, testBlockSize, testTargetSize) + require.NoError(t, err) - it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) - require.NoError(t, err) - for i := 0; i < numSamples; i++ { - require.True(t, it.Next()) + it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) + require.NoError(t, err) + for i := 0; i < numSamples; i++ { + require.True(t, it.Next()) - e := it.Entry() - require.Equal(t, int64(i), e.Timestamp.UnixNano()) - require.Equal(t, strconv.Itoa(i), e.Line) - } - require.NoError(t, it.Error()) + e := it.Entry() + require.Equal(t, int64(i), e.Timestamp.UnixNano()) + require.Equal(t, strconv.Itoa(i), e.Line) + } + require.NoError(t, it.Error()) - sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) - for i := 0; i < numSamples; i++ { - require.True(t, sampleIt.Next(), i) + sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) + for i := 0; i < numSamples; i++ { + require.True(t, sampleIt.Next(), i) - s := sampleIt.Sample() - require.Equal(t, int64(i), s.Timestamp) - require.Equal(t, 1., s.Value) - } - require.NoError(t, sampleIt.Error()) + s := sampleIt.Sample() + require.Equal(t, int64(i), s.Timestamp) + require.Equal(t, 1., s.Value) + } + require.NoError(t, sampleIt.Error()) - byt2, err := chk.Bytes() - require.NoError(t, err) + byt2, err := chk.Bytes() + require.NoError(t, err) - require.True(t, bytes.Equal(byt, byt2)) - }) + require.True(t, bytes.Equal(byt, byt2)) + }) + } } } func TestChunkFilling(t *testing.T) { - for _, enc := range testEncoding { - t.Run(enc.String(), func(t *testing.T) { - t.Parallel() + for _, f := range HeadBlockFmts { + for _, enc := range testEncoding { + t.Run(enc.String(), func(t *testing.T) { + t.Parallel() - chk := NewMemChunk(enc, testBlockSize, 0) - chk.blockSize = 1024 + chk := NewMemChunk(enc, f, testBlockSize, 0) + chk.blockSize = 1024 - // We should be able to append only 10KB of logs. - maxBytes := chk.blockSize * blocksPerChunk - lineSize := 512 - lines := maxBytes / lineSize + // We should be able to append only 10KB of logs. + maxBytes := chk.blockSize * blocksPerChunk + lineSize := 512 + lines := maxBytes / lineSize - logLine := string(make([]byte, lineSize)) - entry := &logproto.Entry{ - Timestamp: time.Unix(0, 0), - Line: logLine, - } + logLine := string(make([]byte, lineSize)) + entry := &logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: logLine, + } - i := int64(0) - for ; chk.SpaceFor(entry) && i < 30; i++ { - entry.Timestamp = time.Unix(0, i) - require.NoError(t, chk.Append(entry)) - } + i := int64(0) + for ; chk.SpaceFor(entry) && i < 30; i++ { + entry.Timestamp = time.Unix(0, i) + require.NoError(t, chk.Append(entry)) + } - require.Equal(t, int64(lines), i) + require.Equal(t, int64(lines), i) - it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) - require.NoError(t, err) - i = 0 - for it.Next() { - entry := it.Entry() - require.Equal(t, i, entry.Timestamp.UnixNano()) - i++ - } + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) + require.NoError(t, err) + i = 0 + for it.Next() { + entry := it.Entry() + require.Equal(t, i, entry.Timestamp.UnixNano()) + i++ + } - require.Equal(t, int64(lines), i) - }) + require.Equal(t, int64(lines), i) + }) + } } } func TestGZIPChunkTargetSize(t *testing.T) { t.Parallel() - chk := NewMemChunk(EncGZIP, testBlockSize, testTargetSize) + chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, testBlockSize, testTargetSize) lineSize := 512 entry := &logproto.Entry{ @@ -443,14 +450,22 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) - assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + if chk.headFmt == OrderedHeadBlockFmt { + assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + } else { + assert.NoError(t, chk.Append(logprotoEntry(1, "test"))) + } }, "append out of order in a new block right after cutting the previous one": func(t *testing.T, chk *MemChunk) { assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) assert.NoError(t, chk.cut()) - assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + if chk.headFmt == OrderedHeadBlockFmt { + assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + } else { + assert.NoError(t, chk.Append(logprotoEntry(1, "test"))) + } }, "append out of order in a new block after multiple cuts": func(t *testing.T, chk *MemChunk) { assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) @@ -459,40 +474,48 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) assert.NoError(t, chk.cut()) - assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + if chk.headFmt == OrderedHeadBlockFmt { + assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + } else { + assert.NoError(t, chk.Append(logprotoEntry(1, "test"))) + } }, } - for testName, tester := range tests { - tester := tester + for _, f := range HeadBlockFmts { + for testName, tester := range tests { + tester := tester - t.Run(testName, func(t *testing.T) { - t.Parallel() + t.Run(testName, func(t *testing.T) { + t.Parallel() - tester(t, NewMemChunk(EncGZIP, testBlockSize, testTargetSize)) - }) + tester(t, NewMemChunk(EncGZIP, f, testBlockSize, testTargetSize)) + }) + } } } func TestChunkSize(t *testing.T) { - for _, enc := range testEncoding { - t.Run(enc.String(), func(t *testing.T) { - t.Parallel() - c := NewMemChunk(enc, testBlockSize, testTargetSize) - inserted := fillChunk(c) - b, err := c.Bytes() - if err != nil { - t.Fatal(err) - } - t.Log("Chunk size", humanize.Bytes(uint64(len(b)))) - t.Log("characters ", humanize.Bytes(uint64(inserted))) - t.Log("Ratio", float64(inserted)/float64(len(b))) - }) + for _, f := range HeadBlockFmts { + for _, enc := range testEncoding { + t.Run(enc.String(), func(t *testing.T) { + t.Parallel() + c := NewMemChunk(enc, f, testBlockSize, testTargetSize) + inserted := fillChunk(c) + b, err := c.Bytes() + if err != nil { + t.Fatal(err) + } + t.Log("Chunk size", humanize.Bytes(uint64(len(b)))) + t.Log("characters ", humanize.Bytes(uint64(inserted))) + t.Log("Ratio", float64(inserted)/float64(len(b))) + }) + } } } func TestChunkStats(t *testing.T) { - c := NewMemChunk(EncSnappy, testBlockSize, 0) + c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, 0) first := time.Now() entry := &logproto.Entry{ Timestamp: first, @@ -559,42 +582,44 @@ func TestChunkStats(t *testing.T) { } func TestIteratorClose(t *testing.T) { - for _, enc := range testEncoding { - t.Run(enc.String(), func(t *testing.T) { - for _, test := range []func(iter iter.EntryIterator, t *testing.T){ - func(iter iter.EntryIterator, t *testing.T) { - // close without iterating - if err := iter.Close(); err != nil { - t.Fatal(err) - } - }, - func(iter iter.EntryIterator, t *testing.T) { - // close after iterating - for iter.Next() { + for _, f := range HeadBlockFmts { + for _, enc := range testEncoding { + t.Run(enc.String(), func(t *testing.T) { + for _, test := range []func(iter iter.EntryIterator, t *testing.T){ + func(iter iter.EntryIterator, t *testing.T) { + // close without iterating + if err := iter.Close(); err != nil { + t.Fatal(err) + } + }, + func(iter iter.EntryIterator, t *testing.T) { + // close after iterating + for iter.Next() { + _ = iter.Entry() + } + if err := iter.Close(); err != nil { + t.Fatal(err) + } + }, + func(iter iter.EntryIterator, t *testing.T) { + // close after a single iteration + iter.Next() _ = iter.Entry() - } - if err := iter.Close(); err != nil { - t.Fatal(err) - } - }, - func(iter iter.EntryIterator, t *testing.T) { - // close after a single iteration - iter.Next() - _ = iter.Entry() - if err := iter.Close(); err != nil { + if err := iter.Close(); err != nil { + t.Fatal(err) + } + }, + } { + c := NewMemChunk(enc, f, testBlockSize, testTargetSize) + inserted := fillChunk(c) + iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline) + if err != nil { t.Fatal(err) } - }, - } { - c := NewMemChunk(enc, testBlockSize, testTargetSize) - inserted := fillChunk(c) - iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline) - if err != nil { - t.Fatal(err) + test(iter, t) } - test(iter, t) - } - }) + }) + } } } @@ -609,21 +634,24 @@ func BenchmarkWrite(b *testing.B) { } i := int64(0) - for _, enc := range testEncoding { - b.Run(enc.String(), func(b *testing.B) { - for n := 0; n < b.N; n++ { - c := NewMemChunk(enc, testBlockSize, testTargetSize) - // adds until full so we trigger cut which serialize using gzip - for c.SpaceFor(entry) { - _ = c.Append(entry) - entry.Timestamp = time.Unix(0, i) - entry.Line = testdata.LogString(i) - i++ + for _, f := range HeadBlockFmts { + + for _, enc := range testEncoding { + b.Run(fmt.Sprintf("%v-%v", f, enc), func(b *testing.B) { + for n := 0; n < b.N; n++ { + c := NewMemChunk(enc, f, testBlockSize, testTargetSize) + // adds until full so we trigger cut which serialize using gzip + for c.SpaceFor(entry) { + _ = c.Append(entry) + entry.Timestamp = time.Unix(0, i) + entry.Line = testdata.LogString(i) + i++ + } + chunks = append(chunks, c) } - chunks = append(chunks, c) - } - result = chunks - }) + result = chunks + }) + } } } @@ -686,7 +714,7 @@ func BenchmarkRead(b *testing.B) { func BenchmarkBackwardIterator(b *testing.B) { b.ReportAllocs() - c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize) + c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize) _ = fillChunk(c) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -781,7 +809,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) { func TestMemChunk_IteratorBounds(t *testing.T) { createChunk := func() *MemChunk { t.Helper() - c := NewMemChunk(EncNone, 1e6, 1e6) + c := NewMemChunk(EncNone, DefaultHeadBlockFmt, 1e6, 1e6) if err := c.Append(&logproto.Entry{ Timestamp: time.Unix(0, 1), @@ -852,7 +880,7 @@ func TestMemchunkLongLine(t *testing.T) { t.Run(enc.String(), func(t *testing.T) { t.Parallel() - c := NewMemChunk(enc, testBlockSize, testTargetSize) + c := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize) for i := 1; i <= 10; i++ { require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)})) } @@ -870,80 +898,58 @@ func TestMemchunkLongLine(t *testing.T) { func TestBytesWith(t *testing.T) { t.Parallel() - exp, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith(nil) + exp, err := NewMemChunk(EncNone, DefaultHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil) require.Nil(t, err) - out, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) + out, err := NewMemChunk(EncNone, DefaultHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) require.Nil(t, err) require.Equal(t, exp, out) } -func TestHeadBlockCheckpointing(t *testing.T) { - t.Parallel() - - c := NewMemChunk(EncSnappy, 256*1024, 1500*1024) - - // add a few entries - for i := 0; i < 5; i++ { - entry := &logproto.Entry{ - Timestamp: time.Unix(int64(i), 0), - Line: fmt.Sprintf("hi there - %d", i), - } - require.Equal(t, true, c.SpaceFor(entry)) - require.Nil(t, c.Append(entry)) - } - - // ensure blocks are not cut - require.Equal(t, 0, len(c.blocks)) - - b, err := c.head.CheckpointBytes(nil) - require.Nil(t, err) - - hb := &headBlock{} - require.Nil(t, hb.LoadBytes(b)) - require.Equal(t, c.head, hb) -} - func TestCheckpointEncoding(t *testing.T) { t.Parallel() blockSize, targetSize := 256*1024, 1500*1024 - c := NewMemChunk(EncSnappy, blockSize, targetSize) - - // add a few entries - for i := 0; i < 5; i++ { - entry := &logproto.Entry{ - Timestamp: time.Unix(int64(i), 0), - Line: fmt.Sprintf("hi there - %d", i), - } - require.Equal(t, true, c.SpaceFor(entry)) - require.Nil(t, c.Append(entry)) - } + for _, f := range HeadBlockFmts { + t.Run(f.String(), func(t *testing.T) { + c := NewMemChunk(EncSnappy, f, blockSize, targetSize) + + // add a few entries + for i := 0; i < 5; i++ { + entry := &logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf("hi there - %d", i), + } + require.Equal(t, true, c.SpaceFor(entry)) + require.Nil(t, c.Append(entry)) + } - // cut it - require.Nil(t, c.cut()) + // cut it + require.Nil(t, c.cut()) - // add a few more to head - for i := 5; i < 10; i++ { - entry := &logproto.Entry{ - Timestamp: time.Unix(int64(i), 0), - Line: fmt.Sprintf("hi there - %d", i), - } - require.Equal(t, true, c.SpaceFor(entry)) - require.Nil(t, c.Append(entry)) - } + // add a few more to head + for i := 5; i < 10; i++ { + entry := &logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf("hi there - %d", i), + } + require.Equal(t, true, c.SpaceFor(entry)) + require.Nil(t, c.Append(entry)) + } - // ensure new blocks are not cut - require.Equal(t, 1, len(c.blocks)) + // ensure new blocks are not cut + require.Equal(t, 1, len(c.blocks)) - var chk, head bytes.Buffer - err := c.SerializeForCheckpointTo(&chk, &head) - require.Nil(t, err) + var chk, head bytes.Buffer + err := c.SerializeForCheckpointTo(&chk, &head) + require.Nil(t, err) - cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), blockSize, targetSize) - require.Nil(t, err) + cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), f, blockSize, targetSize) + require.Nil(t, err) - require.Equal(t, c, cpy) + require.Equal(t, c, cpy) + }) + } } var ( @@ -952,131 +958,140 @@ var ( ) func BenchmarkBufferedIteratorLabels(b *testing.B) { - c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize) - _ = fillChunk(c) - labelsSet := []labels.Labels{ - { - {Name: "cluster", Value: "us-central1"}, - {Name: "stream", Value: "stdout"}, - {Name: "filename", Value: "/var/log/pods/loki-prod_query-frontend-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"}, - {Name: "namespace", Value: "loki-dev"}, - {Name: "job", Value: "loki-prod/query-frontend"}, - {Name: "container", Value: "query-frontend"}, - {Name: "pod", Value: "query-frontend-6894f97b98-89q2n"}, - }, - { - {Name: "cluster", Value: "us-central2"}, - {Name: "stream", Value: "stderr"}, - {Name: "filename", Value: "/var/log/pods/loki-prod_querier-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"}, - {Name: "namespace", Value: "loki-dev"}, - {Name: "job", Value: "loki-prod/querier"}, - {Name: "container", Value: "querier"}, - {Name: "pod", Value: "querier-6894f97b98-89q2n"}, - }, - } - for _, test := range []string{ - `{app="foo"}`, - `{app="foo"} != "foo"`, - `{app="foo"} != "foo" | logfmt `, - `{app="foo"} != "foo" | logfmt | duration > 10ms`, - `{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`, - } { - b.Run(test, func(b *testing.B) { - b.ReportAllocs() - expr, err := logql.ParseLogSelector(test, true) - if err != nil { - b.Fatal(err) - } - p, err := expr.Pipeline() - if err != nil { - b.Fatal(err) - } - var iters []iter.EntryIterator - for _, lbs := range labelsSet { - it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, p.ForStream(lbs)) - if err != nil { - b.Fatal(err) - } - iters = append(iters, it) + for _, f := range HeadBlockFmts { + b.Run(f.String(), func(b *testing.B) { + c := NewMemChunk(EncSnappy, f, testBlockSize, testTargetSize) + _ = fillChunk(c) + + labelsSet := []labels.Labels{ + { + {Name: "cluster", Value: "us-central1"}, + {Name: "stream", Value: "stdout"}, + {Name: "filename", Value: "/var/log/pods/loki-prod_query-frontend-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"}, + {Name: "namespace", Value: "loki-dev"}, + {Name: "job", Value: "loki-prod/query-frontend"}, + {Name: "container", Value: "query-frontend"}, + {Name: "pod", Value: "query-frontend-6894f97b98-89q2n"}, + }, + { + {Name: "cluster", Value: "us-central2"}, + {Name: "stream", Value: "stderr"}, + {Name: "filename", Value: "/var/log/pods/loki-prod_querier-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"}, + {Name: "namespace", Value: "loki-dev"}, + {Name: "job", Value: "loki-prod/querier"}, + {Name: "container", Value: "querier"}, + {Name: "pod", Value: "querier-6894f97b98-89q2n"}, + }, } - b.ResetTimer() - for n := 0; n < b.N; n++ { - for _, it := range iters { - for it.Next() { - streams = append(streams, logproto.Stream{Labels: it.Labels(), Entries: []logproto.Entry{it.Entry()}}) + for _, test := range []string{ + `{app="foo"}`, + `{app="foo"} != "foo"`, + `{app="foo"} != "foo" | logfmt `, + `{app="foo"} != "foo" | logfmt | duration > 10ms`, + `{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`, + } { + b.Run(test, func(b *testing.B) { + b.ReportAllocs() + expr, err := logql.ParseLogSelector(test, true) + if err != nil { + b.Fatal(err) } - } + p, err := expr.Pipeline() + if err != nil { + b.Fatal(err) + } + var iters []iter.EntryIterator + for _, lbs := range labelsSet { + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, p.ForStream(lbs)) + if err != nil { + b.Fatal(err) + } + iters = append(iters, it) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + for _, it := range iters { + for it.Next() { + streams = append(streams, logproto.Stream{Labels: it.Labels(), Entries: []logproto.Entry{it.Entry()}}) + } + } + } + streams = streams[:0] + }) } - streams = streams[:0] - }) - } - for _, test := range []string{ - `rate({app="foo"}[1m])`, - `sum by (cluster) (rate({app="foo"}[10s]))`, - `sum by (cluster) (rate({app="foo"} != "foo" [10s]))`, - `sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`, - `sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`, - `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`, - `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`, - } { - b.Run(test, func(b *testing.B) { - b.ReportAllocs() - expr, err := logql.ParseSampleExpr(test) - if err != nil { - b.Fatal(err) - } - ex, err := expr.Extractor() - if err != nil { - b.Fatal(err) - } - var iters []iter.SampleIterator - for _, lbs := range labelsSet { - iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), ex.ForStream(lbs))) - } - b.ResetTimer() - for n := 0; n < b.N; n++ { - for _, it := range iters { - for it.Next() { - series = append(series, logproto.Series{Labels: it.Labels(), Samples: []logproto.Sample{it.Sample()}}) + for _, test := range []string{ + `rate({app="foo"}[1m])`, + `sum by (cluster) (rate({app="foo"}[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" [10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`, + `sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`, + } { + b.Run(test, func(b *testing.B) { + b.ReportAllocs() + expr, err := logql.ParseSampleExpr(test) + if err != nil { + b.Fatal(err) } - } + ex, err := expr.Extractor() + if err != nil { + b.Fatal(err) + } + var iters []iter.SampleIterator + for _, lbs := range labelsSet { + iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), ex.ForStream(lbs))) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + for _, it := range iters { + for it.Next() { + series = append(series, logproto.Series{Labels: it.Labels(), Samples: []logproto.Sample{it.Sample()}}) + } + } + } + series = series[:0] + }) } - series = series[:0] }) } } func Test_HeadIteratorReverse(t *testing.T) { - c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize) - genEntry := func(i int64) *logproto.Entry { - return &logproto.Entry{ - Timestamp: time.Unix(0, i), - Line: fmt.Sprintf(`msg="%d"`, i), - } - } - var i int64 - for e := genEntry(i); c.SpaceFor(e); e, i = genEntry(i+1), i+1 { - require.NoError(t, c.Append(e)) - } + for _, f := range HeadBlockFmts { + t.Run(f.String(), func(t *testing.T) { + c := NewMemChunk(EncSnappy, f, testBlockSize, testTargetSize) + genEntry := func(i int64) *logproto.Entry { + return &logproto.Entry{ + Timestamp: time.Unix(0, i), + Line: fmt.Sprintf(`msg="%d"`, i), + } + } + var i int64 + for e := genEntry(i); c.SpaceFor(e); e, i = genEntry(i+1), i+1 { + require.NoError(t, c.Append(e)) + } - assertOrder := func(t *testing.T, total int64) { - expr, err := logql.ParseLogSelector(`{app="foo"} | logfmt`, true) - require.NoError(t, err) - p, err := expr.Pipeline() - require.NoError(t, err) - it, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(0, i), logproto.BACKWARD, p.ForStream(labels.Labels{{Name: "app", Value: "foo"}})) - require.NoError(t, err) - for it.Next() { - total-- - require.Equal(t, total, it.Entry().Timestamp.UnixNano()) - } - } + assertOrder := func(t *testing.T, total int64) { + expr, err := logql.ParseLogSelector(`{app="foo"} | logfmt`, true) + require.NoError(t, err) + p, err := expr.Pipeline() + require.NoError(t, err) + it, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(0, i), logproto.BACKWARD, p.ForStream(labels.Labels{{Name: "app", Value: "foo"}})) + require.NoError(t, err) + for it.Next() { + total-- + require.Equal(t, total, it.Entry().Timestamp.UnixNano()) + } + } - assertOrder(t, i) - // let's try again without the headblock. - require.NoError(t, c.cut()) - assertOrder(t, i) + assertOrder(t, i) + // let's try again without the headblock. + require.NoError(t, c.cut()) + assertOrder(t, i) + }) + } } func TestMemChunk_Rebound(t *testing.T) { @@ -1159,7 +1174,7 @@ func TestMemChunk_Rebound(t *testing.T) { } func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk { - chk := NewMemChunk(EncGZIP, defaultBlockSize, 0) + chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, defaultBlockSize, 0) for ; from.Before(through); from = from.Add(time.Second) { err := chk.Append(&logproto.Entry{ Line: from.String(), diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 924ca0ace93a..6de044dad169 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -361,7 +361,7 @@ func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) { if version > OrderedHeadBlockFmt { return hb, nil } - out := &headBlock{} + out := version.NewBlock() err := hb.forEntries( context.Background(), @@ -455,9 +455,8 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { if db.err() != nil { return errors.Wrap(db.err(), "verifying headblock header") } - switch version { - case UnorderedHeadBlockFmt.Byte(): - default: + + if version != UnorderedHeadBlockFmt.Byte() { return errors.Errorf("incompatible headBlock version (%v), only V4 is currently supported", version) } @@ -487,6 +486,10 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { // This is particularly helpful replaying WALs from different configurations // such as after enabling unordered writes. func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) { + if len(b) == 0 { + return desired.NewBlock(), nil + } + db := decbuf{b: b} version := db.byte() @@ -494,17 +497,11 @@ func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) { return nil, errors.Wrap(db.err(), "verifying headblock header") } format := HeadBlockFmt(version) - - var decodedBlock HeadBlock - switch { - case format <= OrderedHeadBlockFmt: - decodedBlock = &headBlock{} - case format == UnorderedHeadBlockFmt: - decodedBlock = newUnorderedHeadBlock() - default: - return nil, fmt.Errorf("unexpected head block version: %v", version) + if format > UnorderedHeadBlockFmt { + return nil, fmt.Errorf("unexpected head block version: %v", format) } + decodedBlock := format.NewBlock() if err := decodedBlock.LoadBytes(b); err != nil { return nil, err } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index a8fe29740ee7..447df07e917f 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/log" ) func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { @@ -331,3 +332,281 @@ func BenchmarkHeadBlockWrites(b *testing.B) { }) } } + +func TestUnorderedChunkIterators(t *testing.T) { + c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + for i := 0; i < 100; i++ { + // push in reverse order + require.Nil(t, c.Append(&logproto.Entry{ + Timestamp: time.Unix(int64(99-i), 0), + Line: fmt.Sprint(99 - i), + })) + + // ensure we have a mix of cut blocks + head block. + if i%30 == 0 { + require.Nil(t, c.cut()) + } + } + + // ensure head block has data + require.Equal(t, false, c.head.IsEmpty()) + + forward, err := c.Iterator( + context.Background(), + time.Unix(0, 0), + time.Unix(100, 0), + logproto.FORWARD, + noopStreamPipeline, + ) + require.Nil(t, err) + + backward, err := c.Iterator( + context.Background(), + time.Unix(0, 0), + time.Unix(100, 0), + logproto.BACKWARD, + noopStreamPipeline, + ) + require.Nil(t, err) + + smpl := c.SampleIterator( + context.Background(), + time.Unix(0, 0), + time.Unix(100, 0), + countExtractor, + ) + + for i := 0; i < 100; i++ { + require.Equal(t, true, forward.Next()) + require.Equal(t, true, backward.Next()) + require.Equal(t, true, smpl.Next()) + require.Equal(t, time.Unix(int64(i), 0), forward.Entry().Timestamp) + require.Equal(t, time.Unix(int64(99-i), 0), backward.Entry().Timestamp) + require.Equal(t, float64(1), smpl.Sample().Value) + require.Equal(t, time.Unix(int64(i), 0).UnixNano(), smpl.Sample().Timestamp) + } + require.Equal(t, false, forward.Next()) + require.Equal(t, false, backward.Next()) +} + +func BenchmarkUnorderedRead(b *testing.B) { + legacy := NewMemChunk(EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize) + fillChunkClose(legacy, false) + ordered := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + fillChunkClose(ordered, false) + unordered := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + fillChunkRandomOrder(unordered, false) + + tcs := []struct { + desc string + c *MemChunk + }{ + { + desc: "ordered+legacy hblock", + c: legacy, + }, + { + desc: "ordered+unordered hblock", + c: ordered, + }, + { + desc: "unordered+unordered hblock", + c: unordered, + }, + } + + b.Run("itr", func(b *testing.B) { + for _, tc := range tcs { + b.Run(tc.desc, func(b *testing.B) { + for n := 0; n < b.N; n++ { + iterator, err := tc.c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) + if err != nil { + panic(err) + } + for iterator.Next() { + _ = iterator.Entry() + } + if err := iterator.Close(); err != nil { + b.Fatal(err) + } + } + }) + } + }) + + b.Run("smpl", func(b *testing.B) { + for _, tc := range tcs { + b.Run(tc.desc, func(b *testing.B) { + for n := 0; n < b.N; n++ { + iterator := tc.c.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) + for iterator.Next() { + _ = iterator.Sample() + } + if err := iterator.Close(); err != nil { + b.Fatal(err) + } + } + }) + } + }) + +} + +func TestUnorderedIteratorCountsAllEntries(t *testing.T) { + c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + fillChunkRandomOrder(c, false) + + ct := 0 + var i int64 + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) + if err != nil { + panic(err) + } + for iterator.Next() { + next := iterator.Entry().Timestamp.UnixNano() + require.GreaterOrEqual(t, next, i) + i = next + ct++ + } + if err := iterator.Close(); err != nil { + t.Fatal(err) + } + require.Equal(t, c.Size(), ct) + + ct = 0 + i = 0 + smpl := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) + for smpl.Next() { + next := smpl.Sample().Timestamp + require.GreaterOrEqual(t, next, i) + i = next + ct += int(smpl.Sample().Value) + } + require.Equal(t, c.Size(), ct) + + if err := iterator.Close(); err != nil { + t.Fatal(err) + } +} + +func chunkFrom(xs []logproto.Entry) ([]byte, error) { + c := NewMemChunk(EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize) + for _, x := range xs { + if err := c.Append(&x); err != nil { + return nil, err + } + } + + if err := c.Close(); err != nil { + return nil, err + } + return c.Bytes() +} + +func TestReorder(t *testing.T) { + for _, tc := range []struct { + desc string + input []logproto.Entry + expected []logproto.Entry + }{ + { + desc: "unordered", + input: []logproto.Entry{ + { + Timestamp: time.Unix(4, 0), + Line: "x", + }, + { + Timestamp: time.Unix(2, 0), + Line: "x", + }, + { + Timestamp: time.Unix(3, 0), + Line: "x", + }, + { + Timestamp: time.Unix(1, 0), + Line: "x", + }, + }, + expected: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: "x", + }, + { + Timestamp: time.Unix(2, 0), + Line: "x", + }, + { + Timestamp: time.Unix(3, 0), + Line: "x", + }, + { + Timestamp: time.Unix(4, 0), + Line: "x", + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + for _, x := range tc.input { + require.Nil(t, c.Append(&x)) + } + require.Nil(t, c.Close()) + b, err := c.Bytes() + require.Nil(t, err) + + exp, err := chunkFrom(tc.expected) + require.Nil(t, err) + + require.Equal(t, exp, b) + }) + + } +} + +func TestReorderAcrossBlocks(t *testing.T) { + c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + for _, batch := range [][]int{ + // ensure our blocks have overlapping bounds and must be reordered + // before closing. + {1, 5}, + {3, 7}, + } { + for _, x := range batch { + require.Nil(t, c.Append(&logproto.Entry{ + Timestamp: time.Unix(int64(x), 0), + Line: fmt.Sprint(x), + })) + } + require.Nil(t, c.cut()) + } + // get bounds before it's reordered + from, to := c.Bounds() + require.Nil(t, c.Close()) + + itr, err := c.Iterator(context.Background(), from, to.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil)) + require.Nil(t, err) + + exp := []entry{ + { + t: time.Unix(1, 0).UnixNano(), + s: "1", + }, + { + t: time.Unix(3, 0).UnixNano(), + s: "3", + }, + { + t: time.Unix(5, 0).UnixNano(), + s: "5", + }, + { + t: time.Unix(7, 0).UnixNano(), + s: "7", + }, + } + iterEq(t, exp, itr) +} diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index 25e5c455928b..7618a8ae5eb8 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -1,6 +1,7 @@ package chunkenc import ( + "math/rand" "time" "github.com/grafana/loki/pkg/chunkenc/testdata" @@ -21,7 +22,7 @@ func generateData(enc Encoding, chunksCount int) ([]Chunk, uint64) { for n := 0; n < chunksCount; n++ { entry := logprotoEntry(0, testdata.LogString(0)) - c := NewMemChunk(enc, testBlockSize, testTargetSize) + c := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize) for c.SpaceFor(entry) { size += uint64(len(entry.Line)) _ = c.Append(entry) @@ -35,6 +36,10 @@ func generateData(enc Encoding, chunksCount int) ([]Chunk, uint64) { } func fillChunk(c Chunk) int64 { + return fillChunkClose(c, true) +} + +func fillChunkClose(c Chunk, close bool) int64 { i := int64(0) inserted := int64(0) entry := &logproto.Entry{ @@ -52,6 +57,31 @@ func fillChunk(c Chunk) int64 { entry.Line = testdata.LogString(i) } - _ = c.Close() + if close { + _ = c.Close() + } return inserted } + +func fillChunkRandomOrder(c Chunk, close bool) { + ub := int64(1 << 30) + i := int64(0) + entry := &logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: testdata.LogString(i), + } + + for c.SpaceFor(entry) { + err := c.Append(entry) + if err != nil { + panic(err) + } + i++ + entry.Timestamp = time.Unix(0, rand.Int63n(ub)) + entry.Line = testdata.LogString(i) + + } + if close { + _ = c.Close() + } +} diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index a0cbbda85097..87c0b1e09eda 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -100,7 +100,11 @@ func fromWireChunks(conf *Config, wireChunks []Chunk) ([]chunkDesc, error) { lastUpdated: c.LastUpdated, } - mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, conf.BlockSize, conf.TargetChunkSize) + hbType := chunkenc.OrderedHeadBlockFmt + if conf.UnorderedWrites { + hbType = chunkenc.UnorderedHeadBlockFmt + } + mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, hbType, conf.BlockSize, conf.TargetChunkSize) if err != nil { return nil, err } @@ -273,6 +277,7 @@ func (s *streamIterator) Next() bool { s.current.To = stream.lastLine.ts s.current.LastLine = stream.lastLine.content s.current.EntryCt = stream.entryCt + s.current.HighestTs = stream.highestTs return true } diff --git a/pkg/ingester/checkpoint.pb.go b/pkg/ingester/checkpoint.pb.go index bcb0871cee87..0a9184403f2d 100644 --- a/pkg/ingester/checkpoint.pb.go +++ b/pkg/ingester/checkpoint.pb.go @@ -141,12 +141,15 @@ type Series struct { Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` Labels []github_com_cortexproject_cortex_pkg_cortexpb.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/cortexpb.LabelAdapter" json:"labels"` Chunks []Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"` - // Last timestamp of the last chunk. - To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"` - LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"` + // most recently pushed timestamp. + To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"` + // most recently pushed line. + LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"` // highest counter value for pushes to this stream. // Used to skip already applied entries during WAL replay. EntryCt int64 `protobuf:"varint,7,opt,name=entryCt,proto3" json:"entryCt,omitempty"` + // highest timestamp pushed to this stream. + HighestTs time.Time `protobuf:"bytes,8,opt,name=highestTs,proto3,stdtime" json:"highestTs"` } func (m *Series) Reset() { *m = Series{} } @@ -223,6 +226,13 @@ func (m *Series) GetEntryCt() int64 { return 0 } +func (m *Series) GetHighestTs() time.Time { + if m != nil { + return m.HighestTs + } + return time.Time{} +} + func init() { proto.RegisterType((*Chunk)(nil), "loki_ingester.Chunk") proto.RegisterType((*Series)(nil), "loki_ingester.Series") @@ -231,39 +241,40 @@ func init() { func init() { proto.RegisterFile("pkg/ingester/checkpoint.proto", fileDescriptor_00f4b7152db9bdb5) } var fileDescriptor_00f4b7152db9bdb5 = []byte{ - // 503 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x3d, 0x8f, 0xd3, 0x40, - 0x10, 0xf5, 0x26, 0x8e, 0x2f, 0xd9, 0x40, 0xb3, 0x20, 0xb4, 0x8a, 0xc4, 0xc6, 0xba, 0x2a, 0x0d, - 0xb6, 0x14, 0x28, 0xa0, 0x41, 0x8a, 0x0f, 0x21, 0x21, 0x5d, 0x81, 0x0c, 0x34, 0x34, 0xc8, 0x1f, - 0x1b, 0xdb, 0xc4, 0xf1, 0x5a, 0xbb, 0x6b, 0x89, 0xeb, 0xf8, 0x09, 0xf7, 0x33, 0xf8, 0x29, 0x57, - 0x46, 0x54, 0x27, 0x90, 0x0e, 0xe2, 0x34, 0x94, 0xf7, 0x13, 0xd0, 0xae, 0x6d, 0x2e, 0x94, 0xee, - 0xe6, 0xbd, 0x99, 0xe7, 0x79, 0x9e, 0x7d, 0xf0, 0x71, 0xb9, 0x49, 0xdc, 0xac, 0x48, 0xa8, 0x90, - 0x94, 0xbb, 0x51, 0x4a, 0xa3, 0x4d, 0xc9, 0xb2, 0x42, 0x3a, 0x25, 0x67, 0x92, 0xa1, 0xfb, 0x39, - 0xdb, 0x64, 0x9f, 0xba, 0xfe, 0x6c, 0x9e, 0x30, 0x96, 0xe4, 0xd4, 0xd5, 0xcd, 0xb0, 0x5a, 0xbb, - 0x32, 0xdb, 0x52, 0x21, 0x83, 0x6d, 0xd9, 0xcc, 0xcf, 0x9e, 0x24, 0x99, 0x4c, 0xab, 0xd0, 0x89, - 0xd8, 0xd6, 0x4d, 0x58, 0xc2, 0xee, 0x26, 0x15, 0xd2, 0x40, 0x57, 0xed, 0xf8, 0x8b, 0xa3, 0xf1, - 0x88, 0x71, 0x49, 0xbf, 0x94, 0x9c, 0x7d, 0xa6, 0x91, 0x6c, 0x91, 0xab, 0xdc, 0xb5, 0x8d, 0xb0, - 0x2d, 0x1a, 0xe9, 0xe9, 0xcf, 0x01, 0x1c, 0x9d, 0xa5, 0x55, 0xb1, 0x41, 0xcf, 0xa1, 0xb9, 0xe6, - 0x6c, 0x8b, 0x81, 0x0d, 0x16, 0xd3, 0xe5, 0xcc, 0x69, 0x3c, 0x3a, 0xdd, 0x66, 0xe7, 0x7d, 0xe7, - 0xd1, 0x1b, 0x5f, 0xdd, 0xcc, 0x8d, 0xcb, 0x5f, 0x73, 0xe0, 0x6b, 0x05, 0x7a, 0x06, 0x07, 0x92, - 0xe1, 0x41, 0x0f, 0xdd, 0x40, 0x32, 0xe4, 0xc1, 0xc9, 0x3a, 0xaf, 0x44, 0x4a, 0xe3, 0x95, 0xc4, - 0xc3, 0x1e, 0xe2, 0x3b, 0x19, 0x7a, 0x0d, 0xa7, 0x79, 0x20, 0xe4, 0x87, 0x32, 0x0e, 0x24, 0x8d, - 0xb1, 0xd9, 0xe3, 0x2b, 0xc7, 0x42, 0xf4, 0x08, 0x5a, 0x51, 0xce, 0x04, 0x8d, 0xf1, 0xc8, 0x06, - 0x8b, 0xb1, 0xdf, 0x22, 0xc5, 0x8b, 0x8b, 0x22, 0xa2, 0x31, 0xb6, 0x1a, 0xbe, 0x41, 0x08, 0x41, - 0x33, 0x0e, 0x64, 0x80, 0x4f, 0x6c, 0xb0, 0xb8, 0xe7, 0xeb, 0x5a, 0x71, 0x29, 0x0d, 0x62, 0x3c, - 0x6e, 0x38, 0x55, 0x9f, 0x7e, 0x1f, 0x40, 0xeb, 0x1d, 0xe5, 0x19, 0x15, 0xea, 0x53, 0x95, 0xa0, - 0xfc, 0xcd, 0x2b, 0x7d, 0xe0, 0x89, 0xdf, 0x22, 0x64, 0xc3, 0xe9, 0x5a, 0x05, 0x83, 0x97, 0x3c, - 0x2b, 0xa4, 0xbe, 0xa2, 0xe9, 0x1f, 0x53, 0xa8, 0x80, 0x56, 0x1e, 0x84, 0x34, 0x17, 0x78, 0x68, - 0x0f, 0x17, 0xd3, 0xe5, 0x03, 0xa7, 0x7b, 0x4a, 0xe7, 0x5c, 0xf1, 0x6f, 0x83, 0x8c, 0x7b, 0x2b, - 0xf5, 0x63, 0x3f, 0x6e, 0xe6, 0xbd, 0xa2, 0xd0, 0xe8, 0x57, 0x71, 0x50, 0x4a, 0xca, 0xfd, 0x76, - 0x0b, 0x5a, 0x42, 0x2b, 0x52, 0x89, 0x10, 0xd8, 0xd4, 0xfb, 0x1e, 0x3a, 0xff, 0xa5, 0xd7, 0xd1, - 0x71, 0xf1, 0x4c, 0xb5, 0xd0, 0x6f, 0x27, 0xdb, 0x08, 0x8c, 0x7a, 0x46, 0x60, 0x06, 0xc7, 0xea, - 0x15, 0xce, 0xb3, 0x82, 0xea, 0x03, 0x4f, 0xfc, 0x7f, 0x18, 0x61, 0x78, 0x42, 0x0b, 0xc9, 0x2f, - 0xce, 0xa4, 0xbe, 0xf2, 0xd0, 0xef, 0xa0, 0xf7, 0x72, 0xb7, 0x27, 0xc6, 0xf5, 0x9e, 0x18, 0xb7, - 0x7b, 0x02, 0xbe, 0xd6, 0x04, 0x7c, 0xab, 0x09, 0xb8, 0xaa, 0x09, 0xd8, 0xd5, 0x04, 0xfc, 0xae, - 0x09, 0xf8, 0x53, 0x13, 0xe3, 0xb6, 0x26, 0xe0, 0xf2, 0x40, 0x8c, 0xdd, 0x81, 0x18, 0xd7, 0x07, - 0x62, 0x7c, 0x1c, 0x77, 0xfe, 0x43, 0x4b, 0xfb, 0x7a, 0xfa, 0x37, 0x00, 0x00, 0xff, 0xff, 0x70, - 0x87, 0xe1, 0x9b, 0xb4, 0x03, 0x00, 0x00, + // 520 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xb1, 0x8e, 0xd3, 0x40, + 0x10, 0xf5, 0x26, 0x8e, 0x2f, 0xd9, 0x40, 0xb3, 0x20, 0xb4, 0x8a, 0xc4, 0x26, 0xba, 0x2a, 0x0d, + 0xb6, 0x14, 0x28, 0xa0, 0x41, 0x4a, 0x0e, 0x21, 0x21, 0x5d, 0x81, 0xcc, 0xd1, 0xd0, 0x20, 0xc7, + 0x9e, 0xd8, 0x26, 0x8e, 0xd7, 0xda, 0xdd, 0x48, 0x5c, 0xc7, 0x27, 0x5c, 0xc5, 0x37, 0xf0, 0x29, + 0x57, 0xa6, 0x3c, 0x81, 0x74, 0x10, 0xa7, 0xa1, 0xbc, 0x4f, 0x40, 0xbb, 0xb6, 0xef, 0x42, 0x77, + 0xe9, 0xe6, 0xbd, 0x99, 0xe7, 0x19, 0xbf, 0x7d, 0xf8, 0x69, 0xb1, 0x8c, 0xbd, 0x34, 0x8f, 0x41, + 0x2a, 0x10, 0x5e, 0x98, 0x40, 0xb8, 0x2c, 0x78, 0x9a, 0x2b, 0xb7, 0x10, 0x5c, 0x71, 0xf2, 0x30, + 0xe3, 0xcb, 0xf4, 0x73, 0xd3, 0x1f, 0x0c, 0x63, 0xce, 0xe3, 0x0c, 0x3c, 0xd3, 0x9c, 0xaf, 0x17, + 0x9e, 0x4a, 0x57, 0x20, 0x55, 0xb0, 0x2a, 0xaa, 0xf9, 0xc1, 0xb3, 0x38, 0x55, 0xc9, 0x7a, 0xee, + 0x86, 0x7c, 0xe5, 0xc5, 0x3c, 0xe6, 0x77, 0x93, 0x1a, 0x19, 0x60, 0xaa, 0x7a, 0xfc, 0xd5, 0xde, + 0x78, 0xc8, 0x85, 0x82, 0xaf, 0x85, 0xe0, 0x5f, 0x20, 0x54, 0x35, 0xf2, 0xf4, 0x75, 0x75, 0x63, + 0x5e, 0x17, 0x95, 0xf4, 0xf8, 0x57, 0x0b, 0x77, 0x4e, 0x92, 0x75, 0xbe, 0x24, 0x2f, 0xb1, 0xbd, + 0x10, 0x7c, 0x45, 0xd1, 0x08, 0x8d, 0xfb, 0x93, 0x81, 0x5b, 0xdd, 0xe8, 0x36, 0x9b, 0xdd, 0xb3, + 0xe6, 0xc6, 0x59, 0xf7, 0xf2, 0x7a, 0x68, 0x5d, 0xfc, 0x1e, 0x22, 0xdf, 0x28, 0xc8, 0x0b, 0xdc, + 0x52, 0x9c, 0xb6, 0x0e, 0xd0, 0xb5, 0x14, 0x27, 0x33, 0xdc, 0x5b, 0x64, 0x6b, 0x99, 0x40, 0x34, + 0x55, 0xb4, 0x7d, 0x80, 0xf8, 0x4e, 0x46, 0xde, 0xe2, 0x7e, 0x16, 0x48, 0xf5, 0xb1, 0x88, 0x02, + 0x05, 0x11, 0xb5, 0x0f, 0xf8, 0xca, 0xbe, 0x90, 0x3c, 0xc1, 0x4e, 0x98, 0x71, 0x09, 0x11, 0xed, + 0x8c, 0xd0, 0xb8, 0xeb, 0xd7, 0x48, 0xf3, 0xf2, 0x3c, 0x0f, 0x21, 0xa2, 0x4e, 0xc5, 0x57, 0x88, + 0x10, 0x6c, 0x47, 0x81, 0x0a, 0xe8, 0xd1, 0x08, 0x8d, 0x1f, 0xf8, 0xa6, 0xd6, 0x5c, 0x02, 0x41, + 0x44, 0xbb, 0x15, 0xa7, 0xeb, 0xe3, 0xef, 0x6d, 0xec, 0x7c, 0x00, 0x91, 0x82, 0xd4, 0x9f, 0x5a, + 0x4b, 0x10, 0xef, 0xde, 0x18, 0x83, 0x7b, 0x7e, 0x8d, 0xc8, 0x08, 0xf7, 0x17, 0x3a, 0x18, 0xa2, + 0x10, 0x69, 0xae, 0x8c, 0x8b, 0xb6, 0xbf, 0x4f, 0x91, 0x1c, 0x3b, 0x59, 0x30, 0x87, 0x4c, 0xd2, + 0xf6, 0xa8, 0x3d, 0xee, 0x4f, 0x1e, 0xb9, 0xcd, 0x53, 0xba, 0xa7, 0x9a, 0x7f, 0x1f, 0xa4, 0x62, + 0x36, 0xd5, 0x3f, 0xf6, 0xf3, 0x7a, 0x78, 0x50, 0x14, 0x2a, 0xfd, 0x34, 0x0a, 0x0a, 0x05, 0xc2, + 0xaf, 0xb7, 0x90, 0x09, 0x76, 0x42, 0x9d, 0x08, 0x49, 0x6d, 0xb3, 0xef, 0xb1, 0xfb, 0x5f, 0x7a, + 0x5d, 0x13, 0x97, 0x99, 0xad, 0x17, 0xfa, 0xf5, 0x64, 0x1d, 0x81, 0xce, 0x81, 0x11, 0x18, 0xe0, + 0xae, 0x7e, 0x85, 0xd3, 0x34, 0x07, 0x63, 0x70, 0xcf, 0xbf, 0xc5, 0x84, 0xe2, 0x23, 0xc8, 0x95, + 0x38, 0x3f, 0x51, 0xc6, 0xe5, 0xb6, 0xdf, 0x40, 0x1d, 0x9c, 0x24, 0x8d, 0x13, 0x90, 0xea, 0x4c, + 0x1a, 0xb7, 0xef, 0x1d, 0x9c, 0x5b, 0xd9, 0xec, 0xf5, 0x66, 0xcb, 0xac, 0xab, 0x2d, 0xb3, 0x6e, + 0xb6, 0x0c, 0x7d, 0x2b, 0x19, 0xfa, 0x51, 0x32, 0x74, 0x59, 0x32, 0xb4, 0x29, 0x19, 0xfa, 0x53, + 0x32, 0xf4, 0xb7, 0x64, 0xd6, 0x4d, 0xc9, 0xd0, 0xc5, 0x8e, 0x59, 0x9b, 0x1d, 0xb3, 0xae, 0x76, + 0xcc, 0xfa, 0xd4, 0x6d, 0x3c, 0x98, 0x3b, 0x66, 0xd1, 0xf3, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff, + 0x38, 0x4d, 0x9b, 0xd7, 0xf8, 0x03, 0x00, 0x00, } func (this *Chunk) Equal(that interface{}) bool { @@ -361,6 +372,9 @@ func (this *Series) Equal(that interface{}) bool { if this.EntryCt != that1.EntryCt { return false } + if !this.HighestTs.Equal(that1.HighestTs) { + return false + } return true } func (this *Chunk) GoString() string { @@ -384,7 +398,7 @@ func (this *Series) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 11) + s := make([]string, 0, 12) s = append(s, "&ingester.Series{") s = append(s, "UserID: "+fmt.Sprintf("%#v", this.UserID)+",\n") s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") @@ -399,6 +413,7 @@ func (this *Series) GoString() string { s = append(s, "To: "+fmt.Sprintf("%#v", this.To)+",\n") s = append(s, "LastLine: "+fmt.Sprintf("%#v", this.LastLine)+",\n") s = append(s, "EntryCt: "+fmt.Sprintf("%#v", this.EntryCt)+",\n") + s = append(s, "HighestTs: "+fmt.Sprintf("%#v", this.HighestTs)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -561,6 +576,14 @@ func (m *Series) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintCheckpoint(dAtA, i, uint64(m.EntryCt)) } + dAtA[i] = 0x42 + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.HighestTs))) + n6, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.HighestTs, dAtA[i:]) + if err != nil { + return 0, err + } + i += n6 return i, nil } @@ -638,6 +661,8 @@ func (m *Series) Size() (n int) { if m.EntryCt != 0 { n += 1 + sovCheckpoint(uint64(m.EntryCt)) } + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.HighestTs) + n += 1 + l + sovCheckpoint(uint64(l)) return n } @@ -683,6 +708,7 @@ func (this *Series) String() string { `To:` + strings.Replace(strings.Replace(this.To.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `LastLine:` + fmt.Sprintf("%v", this.LastLine) + `,`, `EntryCt:` + fmt.Sprintf("%v", this.EntryCt) + `,`, + `HighestTs:` + strings.Replace(strings.Replace(this.HighestTs.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `}`, }, "") return s @@ -1220,6 +1246,39 @@ func (m *Series) Unmarshal(dAtA []byte) error { break } } + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field HighestTs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.HighestTs, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipCheckpoint(dAtA[iNdEx:]) diff --git a/pkg/ingester/checkpoint.proto b/pkg/ingester/checkpoint.proto index 5e3e9104ea3f..426403662693 100644 --- a/pkg/ingester/checkpoint.proto +++ b/pkg/ingester/checkpoint.proto @@ -30,11 +30,13 @@ message Series { uint64 fingerprint = 2; repeated cortexpb.LabelPair labels = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/cortexpb.LabelAdapter"]; repeated Chunk chunks = 4 [(gogoproto.nullable) = false]; - // Last timestamp of the last chunk. + // most recently pushed timestamp. google.protobuf.Timestamp to = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + // most recently pushed line. string lastLine = 6; // highest counter value for pushes to this stream. // Used to skip already applied entries during WAL replay. int64 entryCt = 7; - + // highest timestamp pushed to this stream. + google.protobuf.Timestamp highestTs = 8 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; } diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index e5b1519a9915..d3c3b2dd4efa 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -468,7 +468,7 @@ func Test_SeriesIterator(t *testing.T) { for j := 0; j < 2; j++ { iter.Next() assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID) - memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0) + memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, chunkenc.UnorderedHeadBlockFmt, 0, 0) require.NoError(t, err) it, err := memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil)) require.NoError(t, err) @@ -561,7 +561,7 @@ func buildChunks(t testing.TB, size int) []Chunk { for i := 0; i < size; i++ { // build chunks of 256k blocks, 1.5MB target size. Same as default config. - c := chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 1500*1024) + c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024) fillChunk(t, c) descs = append(descs, chunkDesc{ chunk: c, diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index 8948aa52ac3c..6524105271ec 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -48,7 +48,9 @@ func TestIterator(t *testing.T) { new func() chunkenc.Chunk }{ {"dumbChunk", chunkenc.NewDumbChunk}, - {"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }}, + {"gzipChunk", func() chunkenc.Chunk { + return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0) + }}, } { t.Run(chk.name, func(t *testing.T) { chunk := chk.new() diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index accddddd63b9..15ca335aeec7 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -230,53 +230,87 @@ func dummyConf() *Config { } func Test_EncodingChunks(t *testing.T) { - conf := dummyConf() - c := chunkenc.NewMemChunk(chunkenc.EncGZIP, conf.BlockSize, conf.TargetChunkSize) - fillChunk(t, c) - - from := []chunkDesc{ - { - chunk: c, - }, - // test non zero values - { - chunk: c, - closed: true, - synced: true, - flushed: time.Unix(1, 0), - lastUpdated: time.Unix(0, 1), - }, - } - there, err := toWireChunks(from, nil) - require.Nil(t, err) - chunks := make([]Chunk, 0, len(there)) - for _, c := range there { - chunks = append(chunks, c.Chunk) - } - backAgain, err := fromWireChunks(conf, chunks) - require.Nil(t, err) - - for i, to := range backAgain { - // test the encoding directly as the substructure may change. - // for instance the uncompressed size for each block is not included in the encoded version. - enc, err := to.chunk.Bytes() - require.Nil(t, err) - to.chunk = nil - - matched := from[i] - exp, err := matched.chunk.Bytes() - require.Nil(t, err) - matched.chunk = nil - - require.Equal(t, exp, enc) - require.Equal(t, matched, to) - + for _, f := range chunkenc.HeadBlockFmts { + for _, close := range []bool{true, false} { + for _, tc := range []struct { + desc string + conf Config + }{ + { + // mostly for historical parity + desc: "dummyConf", + conf: *dummyConf(), + }, + { + desc: "default", + conf: defaultIngesterTestConfig(t), + }, + } { + + t.Run(fmt.Sprintf("%v-%v-%s", f, close, tc.desc), func(t *testing.T) { + conf := tc.conf + c := chunkenc.NewMemChunk(chunkenc.EncGZIP, f, conf.BlockSize, conf.TargetChunkSize) + fillChunk(t, c) + if close { + require.Nil(t, c.Close()) + } + + from := []chunkDesc{ + { + chunk: c, + }, + // test non zero values + { + chunk: c, + closed: true, + synced: true, + flushed: time.Unix(1, 0), + lastUpdated: time.Unix(0, 1), + }, + } + there, err := toWireChunks(from, nil) + require.Nil(t, err) + chunks := make([]Chunk, 0, len(there)) + for _, c := range there { + chunks = append(chunks, c.Chunk) + + // Ensure closed head chunks are empty + if close { + require.Equal(t, 0, len(c.Head)) + } else { + require.Greater(t, len(c.Head), 0) + } + } + + backAgain, err := fromWireChunks(&conf, chunks) + require.Nil(t, err) + + for i, to := range backAgain { + // test the encoding directly as the substructure may change. + // for instance the uncompressed size for each block is not included in the encoded version. + enc, err := to.chunk.Bytes() + require.Nil(t, err) + to.chunk = nil + + matched := from[i] + exp, err := matched.chunk.Bytes() + require.Nil(t, err) + matched.chunk = nil + + require.Equal(t, exp, enc) + require.Equal(t, matched, to) + + } + + }) + } + } } } func Test_EncodingCheckpoint(t *testing.T) { conf := dummyConf() - c := chunkenc.NewMemChunk(chunkenc.EncGZIP, conf.BlockSize, conf.TargetChunkSize) + c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) require.Nil(t, c.Append(&logproto.Entry{ Timestamp: time.Unix(1, 0), Line: "hi there", diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 9daf77b651aa..b95983c7c7fc 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -121,7 +121,7 @@ func buildChunkDecs(t testing.TB) []*chunkDesc { for i := range res { res[i] = &chunkDesc{ closed: true, - chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, dummyConf().BlockSize, dummyConf().TargetChunkSize), + chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize), } fillChunk(t, res[i].chunk) require.NoError(t, res[i].chunk.Close()) @@ -289,6 +289,9 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg.LifecyclerConfig.ID = "localhost" cfg.LifecyclerConfig.FinalSleep = 0 cfg.LifecyclerConfig.MinReadyDuration = 0 + cfg.BlockSize = 256 * 1024 + cfg.TargetChunkSize = 1500 * 1024 + cfg.UnorderedWrites = true return cfg } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e7f947ccf629..7d0b2f820f92 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -80,6 +80,8 @@ type Config struct { WAL WALConfig `yaml:"wal,omitempty"` ChunkFilterer storage.RequestChunkFilterer `yaml:"-"` + + UnorderedWrites bool `yaml:"unordered_writes_enabled"` } // RegisterFlags registers the flags. @@ -102,6 +104,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.") f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.") f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`") + f.BoolVar(&cfg.UnorderedWrites, "ingester.unordered-writes-enabled", false, "(Experimental) Allow out of order writes.") } func (cfg *Config) Validate() error { diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 908bffb312f0..3d0c5c4ed29b 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -126,6 +126,7 @@ func (r *ingesterRecoverer) Series(series *Series) error { stream.lastLine.ts = series.To stream.lastLine.content = series.LastLine stream.entryCt = series.EntryCt + stream.highestTs = series.HighestTs if err != nil { return err diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 657708416d2b..cb61f4ccf670 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -72,8 +72,17 @@ type stream struct { labels labels.Labels labelsString string - lastLine line - metrics *ingesterMetrics + + // most recently pushed line. This is used to prevent duplicate pushes. + // It also determines chunk synchronization when unordered writes are disabled. + lastLine line + + // keeps track of the highest timestamp accepted by the stream. + // This is used when unordered writes are enabled to cap the validity window + // of accepted writes and for chunk synchronization. + highestTs time.Time + + metrics *ingesterMetrics tailers map[uint32]*tailer tailerMtx sync.RWMutex @@ -113,6 +122,7 @@ func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, metrics // consumeChunk manually adds a chunk to the stream that was received during // ingester chunk transfer. +// DEPRECATED: chunk transfers are no longer suggested and remain for compatibility. func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize) if err != nil { @@ -145,7 +155,11 @@ func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err er } func (s *stream) NewChunk() *chunkenc.MemChunk { - return chunkenc.NewMemChunk(s.cfg.parsedEncoding, s.cfg.BlockSize, s.cfg.TargetChunkSize) + hbType := chunkenc.OrderedHeadBlockFmt + if s.cfg.UnorderedWrites { + hbType = chunkenc.UnorderedHeadBlockFmt + } + return chunkenc.NewMemChunk(s.cfg.parsedEncoding, hbType, s.cfg.BlockSize, s.cfg.TargetChunkSize) } func (s *stream) Push( @@ -169,14 +183,11 @@ func (s *stream) Push( var bytesAdded int prevNumChunks := len(s.chunks) - var lastChunkTimestamp time.Time if prevNumChunks == 0 { s.chunks = append(s.chunks, chunkDesc{ chunk: s.NewChunk(), }) chunksCreatedTotal.Inc() - } else { - _, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds() } var storedEntries []logproto.Entry @@ -198,7 +209,7 @@ func (s *stream) Push( } chunk := &s.chunks[len(s.chunks)-1] - if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) { + if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, s.highestTs, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) { // If the chunk has no more space call Close to make sure anything in the head block is cut and compressed err := chunk.chunk.Close() if err != nil { @@ -216,15 +227,20 @@ func (s *stream) Push( chunk: s.NewChunk(), }) chunk = &s.chunks[len(s.chunks)-1] - lastChunkTimestamp = time.Time{} } - if err := chunk.chunk.Append(&entries[i]); err != nil { + + // The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age. + if s.cfg.UnorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) { + failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrOutOfOrder}) + } else if err := chunk.chunk.Append(&entries[i]); err != nil { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err}) } else { storedEntries = append(storedEntries, entries[i]) - lastChunkTimestamp = entries[i].Timestamp - s.lastLine.ts = lastChunkTimestamp + s.lastLine.ts = entries[i].Timestamp s.lastLine.content = entries[i].Line + if s.highestTs.Before(entries[i].Timestamp) { + s.highestTs = entries[i].Timestamp + } s.entryCt++ // length of string plus @@ -307,15 +323,17 @@ func (s *stream) Push( // Returns true, if chunk should be cut before adding new entry. This is done to make ingesters // cut the chunk for this stream at the same moment, so that new chunk will contain exactly the same entries. -func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp time.Time, c *chunkDesc, synchronizePeriod time.Duration, minUtilization float64) bool { - if synchronizePeriod <= 0 || prevEntryTimestamp.IsZero() { +func (s *stream) cutChunkForSynchronization(entryTimestamp, latestTs time.Time, c *chunkDesc, synchronizePeriod time.Duration, minUtilization float64) bool { + // Never sync when it's not enabled, it's the first push, or if a write isn't the latest ts + // to prevent syncing many unordered writes. + if synchronizePeriod <= 0 || latestTs.IsZero() || latestTs.After(entryTimestamp) { return false } // we use fingerprint as a jitter here, basically offsetting stream synchronization points to different // this breaks if streams are mapped to different fingerprints on different ingesters, which is too bad. cts := (uint64(entryTimestamp.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds()) - pts := (uint64(prevEntryTimestamp.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds()) + pts := (uint64(latestTs.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds()) // if current entry timestamp has rolled over synchronization period if cts < pts { diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index f845f6b7f9d5..090be380d3f0 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -15,6 +15,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" ) @@ -137,7 +138,9 @@ func TestStreamIterator(t *testing.T) { name string new func() *chunkenc.MemChunk }{ - {"gzipChunk", func() *chunkenc.MemChunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }}, + {"gzipChunk", func() *chunkenc.MemChunk { + return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0) + }}, } { t.Run(chk.name, func(t *testing.T) { var s stream @@ -177,6 +180,79 @@ func TestStreamIterator(t *testing.T) { } } +func TestUnorderedPush(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.MaxChunkAge = 10 * time.Second + s := newStream( + &cfg, + model.Fingerprint(0), + labels.Labels{ + {Name: "foo", Value: "bar"}, + }, + NilMetrics, + ) + + for _, x := range []struct { + entries []logproto.Entry + err bool + written int + }{ + { + entries: []logproto.Entry{ + {Timestamp: time.Unix(2, 0), Line: "x"}, + {Timestamp: time.Unix(1, 0), Line: "x"}, + {Timestamp: time.Unix(2, 0), Line: "x"}, + {Timestamp: time.Unix(2, 0), Line: "x"}, // duplicate ts/line is ignored + {Timestamp: time.Unix(10, 0), Line: "x"}, + }, + written: 4, // 1 ignored + }, + // highest ts is now 10, validity bound is (10-10/2) = 5 + { + entries: []logproto.Entry{ + {Timestamp: time.Unix(4, 0), Line: "x"}, // ordering err, too far + {Timestamp: time.Unix(8, 0), Line: "x"}, + {Timestamp: time.Unix(9, 0), Line: "x"}, + }, + err: true, + written: 2, // 1 ignored + }, + } { + written, err := s.Push(context.Background(), x.entries, recordPool.GetRecord(), 0) + if x.err { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + require.Equal(t, x.written, written) + } + + exp := []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "x"}, + {Timestamp: time.Unix(2, 0), Line: "x"}, + // duplicate was allowed here b/c it wasnt written sequentially + {Timestamp: time.Unix(2, 0), Line: "x"}, + {Timestamp: time.Unix(8, 0), Line: "x"}, + {Timestamp: time.Unix(9, 0), Line: "x"}, + {Timestamp: time.Unix(10, 0), Line: "x"}, + } + + itr, err := s.Iterator(context.Background(), nil, time.Unix(int64(0), 0), time.Unix(11, 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(s.labels)) + require.Nil(t, err) + iterEq(t, exp, itr) + +} + +func iterEq(t *testing.T, exp []logproto.Entry, got iter.EntryIterator) { + var i int + for got.Next() { + require.Equal(t, exp[i].Timestamp, got.Entry().Timestamp, "failed on the (%d) ts", i) + require.Equal(t, exp[i].Line, got.Entry().Line) + i++ + } + require.Equal(t, i, len(exp)) +} + func Benchmark_PushStream(b *testing.B) { ls := labels.Labels{ labels.Label{Name: "namespace", Value: "loki-dev"}, diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index 28ec897282b0..abf1657db6ac 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -30,6 +30,7 @@ func TestTransferOut(t *testing.T) { f := newTestIngesterFactory(t) ing := f.getIngester(time.Duration(0), t) + ing.cfg.UnorderedWrites = false // enforce ordered writes on old testware (transfers are deprecated). // Push some data into our original ingester ctx := user.InjectOrgID(context.Background(), "test") diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index f069964fb288..8088c0bf33e9 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -107,7 +107,7 @@ func fillStore() error { labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := client.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, 262144, 1572864) + chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864) for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() { entry := &logproto.Entry{ Timestamp: time.Unix(0, ts), @@ -130,7 +130,7 @@ func fillStore() error { if flushCount >= maxChunks { return } - chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, 262144, 1572864) + chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864) } } }(i) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index 0a0caf856521..59f2e2dcce96 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -222,7 +222,7 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := client.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, blockSize, targetSize) + chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { require.NoError(t, chunkEnc.Append(&logproto.Entry{ diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index bf3ba21b29b9..59f503a447d7 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -99,9 +99,8 @@ func newChunk(stream logproto.Stream) chunk.Chunk { builder.Set(labels.MetricName, "logs") lbs = builder.Labels() } - from, through := loki_util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp) - chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) + chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0) for _, e := range stream.Entries { _ = chk.Append(&e) }