diff --git a/CHANGELOG.md b/CHANGELOG.md index 33670819a0d6..541ed5f02918 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully * [5997](https://github.com/grafana/loki/pull/5997) **simonswine**: Querier: parallize label queries to both stores. * [5406](https://github.com/grafana/loki/pull/5406) **ctovena**: Revise the configuration parameters that configure the usage report to grafana.com. +* [7264](https://github.com/grafana/loki/pull/7264) **bboreham**: Chunks: decode varints directly from byte buffer, for speed. * [7263](https://github.com/grafana/loki/pull/7263) **bboreham**: Dependencies: klauspost/compress package to v1.15.11; improves performance. ##### Fixes diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 4dec0ca13301..0bc622c03876 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1,7 +1,6 @@ package chunkenc import ( - "bufio" "bytes" "context" "encoding/binary" @@ -1114,12 +1113,14 @@ type bufferedIterator struct { origBytes []byte stats *stats.Context - bufReader *bufio.Reader - reader io.Reader - pool ReaderPool + reader io.Reader + pool ReaderPool err error + readBuf [20]byte // Enough bytes to store two varints. + readBufValid int // How many bytes are left in readBuf from previous read. + buf []byte // The buffer for a single entry. currLine []byte // the current line, this is the same as the buffer but sliced the the line size. currTs int64 @@ -1134,7 +1135,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *buffer stats: stats, origBytes: b, reader: nil, // will be initialized later - bufReader: nil, // will be initialized later pool: pool, } } @@ -1146,8 +1146,12 @@ func (si *bufferedIterator) Next() bool { if !si.closed && si.reader == nil { // initialize reader now, hopefully reusing one of the previous readers - si.reader = si.pool.GetReader(bytes.NewBuffer(si.origBytes)) - si.bufReader = BufReaderPool.Get(si.reader) + var err error + si.reader, err = si.pool.GetReader(bytes.NewBuffer(si.origBytes)) + if err != nil { + si.err = err + return false + } } ts, line, ok := si.moveNext() @@ -1166,22 +1170,30 @@ func (si *bufferedIterator) Next() bool { // moveNext moves the buffer to the next entry func (si *bufferedIterator) moveNext() (int64, []byte, bool) { - ts, err := binary.ReadVarint(si.bufReader) - if err != nil { - if err != io.EOF { - si.err = err - } - return 0, nil, false - } - - l, err := binary.ReadUvarint(si.bufReader) - if err != nil { - if err != io.EOF { - si.err = err - return 0, nil, false + var ts int64 + var tWidth, lWidth, lineSize, lastAttempt int + for lWidth == 0 { // Read until both varints have enough bytes. + n, err := si.reader.Read(si.readBuf[si.readBufValid:]) + si.readBufValid += n + if err != nil { + if err != io.EOF { + si.err = err + return 0, nil, false + } + if si.readBufValid == 0 { // Got EOF and no data in the buffer. + return 0, nil, false + } + if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time. + si.err = fmt.Errorf("invalid data in chunk") + return 0, nil, false + } } + var l uint64 + ts, tWidth = binary.Varint(si.readBuf[:si.readBufValid]) + l, lWidth = binary.Uvarint(si.readBuf[tWidth:si.readBufValid]) + lineSize = int(l) + lastAttempt = si.readBufValid } - lineSize := int(l) if lineSize >= maxLineLength { si.err = fmt.Errorf("line too long %d, maximum %d", lineSize, maxLineLength) @@ -1199,19 +1211,25 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) { return 0, nil, false } } + si.buf = si.buf[:lineSize] + // Take however many bytes are left in the read buffer. + n := copy(si.buf, si.readBuf[tWidth+lWidth:si.readBufValid]) + // Shift down what is still left in the fixed-size read buffer, if any. + si.readBufValid = copy(si.readBuf[:], si.readBuf[tWidth+lWidth+n:si.readBufValid]) + // Then process reading the line. - n, err := si.bufReader.Read(si.buf[:lineSize]) - if err != nil && err != io.EOF { - si.err = err - return 0, nil, false - } for n < lineSize { - r, err := si.bufReader.Read(si.buf[n:lineSize]) - if err != nil && err != io.EOF { + r, err := si.reader.Read(si.buf[n:lineSize]) + n += r + if err != nil { + // We might get EOF after reading enough bytes to fill the buffer, which is OK. + // EOF and zero bytes read when the buffer isn't full is an error. + if err == io.EOF && r != 0 { + continue + } si.err = err return 0, nil, false } - n += r } return ts, si.buf[:lineSize], true } @@ -1231,10 +1249,6 @@ func (si *bufferedIterator) close() { si.pool.PutReader(si.reader) si.reader = nil } - if si.bufReader != nil { - BufReaderPool.Put(si.bufReader) - si.bufReader = nil - } if si.buf != nil { BytesBufferPool.Put(si.buf) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 11d02384f386..613185cd6ec1 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -178,6 +178,38 @@ func TestBlock(t *testing.T) { } } +func TestCorruptChunk(t *testing.T) { + for _, enc := range testEncoding { + t.Run(enc.String(), func(t *testing.T) { + t.Parallel() + + chk := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize) + cases := []struct { + data []byte + }{ + // Data that should not decode as lines from a chunk in any encoding. + {data: []byte{0}}, + {data: []byte{1}}, + {data: []byte("asdfasdfasdfqwyteqwtyeq")}, + } + + ctx, start, end := context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64) + for i, c := range cases { + chk.blocks = []block{{b: c.data}} + it, err := chk.Iterator(ctx, start, end, logproto.FORWARD, noopStreamPipeline) + require.NoError(t, err, "case %d", i) + + idx := 0 + for it.Next() { + idx++ + } + require.Error(t, it.Error(), "case %d", i) + require.NoError(t, it.Close()) + } + }) + } +} + func TestReadFormatV1(t *testing.T) { t.Parallel() diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index b73dd6b805bb..c8f5359b78a5 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -26,7 +26,7 @@ type WriterPool interface { // ReaderPool similar to WriterPool but for reading chunks. type ReaderPool interface { - GetReader(io.Reader) io.Reader + GetReader(io.Reader) (io.Reader, error) PutReader(io.Reader) } @@ -44,13 +44,6 @@ var ( // Noop is the no compression pool Noop NoopPool - // BufReaderPool is bufio.Reader pool - BufReaderPool = &BufioReaderPool{ - pool: sync.Pool{ - New: func() interface{} { return bufio.NewReader(nil) }, - }, - } - // BytesBufferPool is a bytes buffer used for lines decompressed. // Buckets [0.5KB,1KB,2KB,4KB,8KB] BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) @@ -117,21 +110,32 @@ type GzipPool struct { level int } +// Gzip needs buffering to read efficiently. +// We need to be able to see the underlying gzip.Reader to Reset it. +type gzipBufferedReader struct { + *bufio.Reader + gzipReader *gzip.Reader +} + // GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *GzipPool) GetReader(src io.Reader) io.Reader { +func (pool *GzipPool) GetReader(src io.Reader) (io.Reader, error) { if r := pool.readers.Get(); r != nil { - reader := r.(*gzip.Reader) - err := reader.Reset(src) + reader := r.(*gzipBufferedReader) + err := reader.gzipReader.Reset(src) if err != nil { - panic(err) + return nil, err } - return reader + reader.Reader.Reset(reader.gzipReader) + return reader, nil } - reader, err := gzip.NewReader(src) + gzipReader, err := gzip.NewReader(src) if err != nil { - panic(err) + return nil, err } - return reader + return &gzipBufferedReader{ + gzipReader: gzipReader, + Reader: bufio.NewReaderSize(gzipReader, 4*1024), + }, nil } // PutReader places back in the pool a CompressionReader @@ -171,16 +175,16 @@ type FlatePool struct { } // GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *FlatePool) GetReader(src io.Reader) io.Reader { +func (pool *FlatePool) GetReader(src io.Reader) (io.Reader, error) { if r := pool.readers.Get(); r != nil { reader := r.(flate.Resetter) err := reader.Reset(src, nil) if err != nil { panic(err) } - return reader.(io.Reader) + return reader.(io.Reader), nil } - return flate.NewReader(src) + return flate.NewReader(src), nil } // PutReader places back in the pool a CompressionReader @@ -219,21 +223,21 @@ type ZstdPool struct { } // GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *ZstdPool) GetReader(src io.Reader) io.Reader { +func (pool *ZstdPool) GetReader(src io.Reader) (io.Reader, error) { if r := pool.readers.Get(); r != nil { reader := r.(*zstd.Decoder) err := reader.Reset(src) if err != nil { - panic(err) + return nil, err } - return reader + return reader, nil } reader, err := zstd.NewReader(src) if err != nil { - panic(err) + return nil, err } runtime.SetFinalizer(reader, (*zstd.Decoder).Close) - return reader + return reader, nil } // PutReader places back in the pool a CompressionReader @@ -267,16 +271,27 @@ type LZ4Pool struct { bufferSize uint32 // available values: 1<<16 (64k), 1<<18 (256k), 1<<20 (1M), 1<<22 (4M). Defaults to 4MB, if not set. } +// We need to be able to see the underlying lz4.Reader to Reset it. +type lz4BufferedReader struct { + *bufio.Reader + lz4Reader *lz4.Reader +} + // GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *LZ4Pool) GetReader(src io.Reader) io.Reader { - var r *lz4.Reader +func (pool *LZ4Pool) GetReader(src io.Reader) (io.Reader, error) { + var r *lz4BufferedReader if pooled := pool.readers.Get(); pooled != nil { - r = pooled.(*lz4.Reader) - r.Reset(src) + r = pooled.(*lz4BufferedReader) + r.lz4Reader.Reset(src) + r.Reader.Reset(r.lz4Reader) } else { - r = lz4.NewReader(src) + lz4Reader := lz4.NewReader(src) + r = &lz4BufferedReader{ + lz4Reader: lz4Reader, + Reader: bufio.NewReaderSize(lz4Reader, 4*1024), + } } - return r + return r, nil } // PutReader places back in the pool a CompressionReader @@ -315,13 +330,13 @@ type SnappyPool struct { } // GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *SnappyPool) GetReader(src io.Reader) io.Reader { +func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) { if r := pool.readers.Get(); r != nil { reader := r.(*snappy.Reader) reader.Reset(src) - return reader + return reader, nil } - return snappy.NewReader(src) + return snappy.NewReader(src), nil } // PutReader places back in the pool a CompressionReader @@ -347,8 +362,8 @@ func (pool *SnappyPool) PutWriter(writer io.WriteCloser) { type NoopPool struct{} // GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *NoopPool) GetReader(src io.Reader) io.Reader { - return src +func (pool *NoopPool) GetReader(src io.Reader) (io.Reader, error) { + return src, nil } // PutReader places back in the pool a CompressionReader @@ -367,23 +382,3 @@ func (pool *NoopPool) GetWriter(dst io.Writer) io.WriteCloser { // PutWriter places back in the pool a CompressionWriter func (pool *NoopPool) PutWriter(writer io.WriteCloser) {} - -// BufioReaderPool is a bufio reader that uses sync.Pool. -type BufioReaderPool struct { - pool sync.Pool -} - -// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool. -func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader { - buf := bufPool.pool.Get().(*bufio.Reader) - if buf == nil { - return bufio.NewReaderSize(r, 4*1024) - } - buf.Reset(r) - return buf -} - -// Put puts the bufio.Reader back into the pool. -func (bufPool *BufioReaderPool) Put(b *bufio.Reader) { - bufPool.pool.Put(b) -} diff --git a/pkg/chunkenc/pool_test.go b/pkg/chunkenc/pool_test.go index 6b15b5496b86..978945006719 100644 --- a/pkg/chunkenc/pool_test.go +++ b/pkg/chunkenc/pool_test.go @@ -36,7 +36,8 @@ func TestPool(t *testing.T) { require.NoError(t, w.Close()) require.True(t, buf.Len() != 0, enc) - r := rpool.GetReader(bytes.NewBuffer(buf.Bytes())) + r, err := rpool.GetReader(bytes.NewBuffer(buf.Bytes())) + require.NoError(t, err) defer rpool.PutReader(r) n, err := r.Read(res) if err != nil { diff --git a/pkg/storage/stores/tsdb/single_file_index.go b/pkg/storage/stores/tsdb/single_file_index.go index 42c2c2eeddce..d3106b1852e8 100644 --- a/pkg/storage/stores/tsdb/single_file_index.go +++ b/pkg/storage/stores/tsdb/single_file_index.go @@ -95,10 +95,12 @@ func NewTSDBIndexFromFile(location string, gzip bool) (*TSDBIndex, []byte, error // decompress if needed if gzip { - r := chunkenc.Gzip.GetReader(bytes.NewReader(raw)) + r, err := chunkenc.Gzip.GetReader(bytes.NewReader(raw)) + if err != nil { + return nil, nil, err + } defer chunkenc.Gzip.PutReader(r) - var err error cleaned, err = io.ReadAll(r) if err != nil { return nil, nil, err