Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunks: decode varints directly from byte buffer; stop panicing on some corrupt inputs #7264

Merged
merged 5 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully
* [5997](https://github.com/grafana/loki/pull/5997) **simonswine**: Querier: parallize label queries to both stores.
* [5406](https://github.com/grafana/loki/pull/5406) **ctovena**: Revise the configuration parameters that configure the usage report to grafana.com.
* [7264](https://github.com/grafana/loki/pull/7264) **bboreham**: Chunks: decode varints directly from byte buffer, for speed.
* [7263](https://github.com/grafana/loki/pull/7263) **bboreham**: Dependencies: klauspost/compress package to v1.15.11; improves performance.

##### Fixes
Expand Down
80 changes: 47 additions & 33 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package chunkenc

import (
"bufio"
"bytes"
"context"
"encoding/binary"
Expand Down Expand Up @@ -1114,12 +1113,14 @@ type bufferedIterator struct {
origBytes []byte
stats *stats.Context

bufReader *bufio.Reader
reader io.Reader
pool ReaderPool
reader io.Reader
pool ReaderPool

err error

readBuf [20]byte // Enough bytes to store two varints.
readBufValid int // How many bytes are left in readBuf from previous read.

buf []byte // The buffer for a single entry.
currLine []byte // the current line, this is the same as the buffer but sliced the the line size.
currTs int64
Expand All @@ -1134,7 +1135,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *buffer
stats: stats,
origBytes: b,
reader: nil, // will be initialized later
bufReader: nil, // will be initialized later
pool: pool,
}
}
Expand All @@ -1146,8 +1146,12 @@ func (si *bufferedIterator) Next() bool {

if !si.closed && si.reader == nil {
// initialize reader now, hopefully reusing one of the previous readers
si.reader = si.pool.GetReader(bytes.NewBuffer(si.origBytes))
si.bufReader = BufReaderPool.Get(si.reader)
var err error
si.reader, err = si.pool.GetReader(bytes.NewBuffer(si.origBytes))
if err != nil {
si.err = err
return false
}
}

ts, line, ok := si.moveNext()
Expand All @@ -1166,22 +1170,30 @@ func (si *bufferedIterator) Next() bool {

// moveNext moves the buffer to the next entry
func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
ts, err := binary.ReadVarint(si.bufReader)
if err != nil {
if err != io.EOF {
si.err = err
}
return 0, nil, false
}

l, err := binary.ReadUvarint(si.bufReader)
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, false
var ts int64
var tWidth, lWidth, lineSize, lastAttempt int
for lWidth == 0 { // Read until both varints have enough bytes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realise this can loop forever on some corrupt inputs - if there are some bytes in the buffer but not enough to decode two varints. Need to detect this somehow and error.

n, err := si.reader.Read(si.readBuf[si.readBufValid:])
si.readBufValid += n
if err != nil {
if err != io.EOF {
si.err = err
return 0, nil, false
}
if si.readBufValid == 0 { // Got EOF and no data in the buffer.
return 0, nil, false
}
if si.readBufValid == lastAttempt { // Got EOF and could not parse same data last time.
si.err = fmt.Errorf("invalid data in chunk")
return 0, nil, false
}
}
var l uint64
ts, tWidth = binary.Varint(si.readBuf[:si.readBufValid])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

l, lWidth = binary.Uvarint(si.readBuf[tWidth:si.readBufValid])
lineSize = int(l)
lastAttempt = si.readBufValid
}
lineSize := int(l)

if lineSize >= maxLineLength {
si.err = fmt.Errorf("line too long %d, maximum %d", lineSize, maxLineLength)
Expand All @@ -1199,19 +1211,25 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
return 0, nil, false
}
}
si.buf = si.buf[:lineSize]
// Take however many bytes are left in the read buffer.
n := copy(si.buf, si.readBuf[tWidth+lWidth:si.readBufValid])
// Shift down what is still left in the fixed-size read buffer, if any.
si.readBufValid = copy(si.readBuf[:], si.readBuf[tWidth+lWidth+n:si.readBufValid])
Comment on lines +1217 to +1218
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is impossible, since si.buf will be long enough to hold the line (logic above) and

Copy returns the number of elements copied, which will be the minimum of len(src) and len(dst).

Adjusting readBuf & readBufValid could further complicate the next moveNext call if there's not enough space remaining for the varints (timestamp, linesize).

Suggested change
// Shift down what is still left in the fixed-size read buffer, if any.
si.readBufValid = copy(si.readBuf[:], si.readBuf[tWidth+lWidth+n:si.readBufValid])
si.readBufValid = 0

Again, I think this is impossible, but this will make the code a little clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose the line was of length 0 bytes, or anything up to about 10. Then tWidth is maybe 4, lWidth is 1 and n is <=10, but readBufValid can be 20. So we need to copy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes this makes sense. I definitely missed it on my first pass; thanks for the help and great PR!


// Then process reading the line.
n, err := si.bufReader.Read(si.buf[:lineSize])
if err != nil && err != io.EOF {
si.err = err
return 0, nil, false
}
for n < lineSize {
r, err := si.bufReader.Read(si.buf[n:lineSize])
if err != nil && err != io.EOF {
r, err := si.reader.Read(si.buf[n:lineSize])
n += r
if err != nil {
// We might get EOF after reading enough bytes to fill the buffer, which is OK.
// EOF and zero bytes read when the buffer isn't full is an error.
if err == io.EOF && r != 0 {
continue
}
si.err = err
return 0, nil, false
}
n += r
}
return ts, si.buf[:lineSize], true
}
Expand All @@ -1231,10 +1249,6 @@ func (si *bufferedIterator) close() {
si.pool.PutReader(si.reader)
si.reader = nil
}
if si.bufReader != nil {
BufReaderPool.Put(si.bufReader)
si.bufReader = nil
}

if si.buf != nil {
BytesBufferPool.Put(si.buf)
Expand Down
32 changes: 32 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,38 @@ func TestBlock(t *testing.T) {
}
}

func TestCorruptChunk(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()

chk := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
cases := []struct {
data []byte
}{
// Data that should not decode as lines from a chunk in any encoding.
{data: []byte{0}},
{data: []byte{1}},
{data: []byte("asdfasdfasdfqwyteqwtyeq")},
}

ctx, start, end := context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64)
for i, c := range cases {
chk.blocks = []block{{b: c.data}}
it, err := chk.Iterator(ctx, start, end, logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err, "case %d", i)

idx := 0
for it.Next() {
idx++
}
require.Error(t, it.Error(), "case %d", i)
require.NoError(t, it.Close())
}
})
}
}

func TestReadFormatV1(t *testing.T) {
t.Parallel()

Expand Down
105 changes: 50 additions & 55 deletions pkg/chunkenc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type WriterPool interface {

// ReaderPool similar to WriterPool but for reading chunks.
type ReaderPool interface {
GetReader(io.Reader) io.Reader
GetReader(io.Reader) (io.Reader, error)
PutReader(io.Reader)
}

Expand All @@ -44,13 +44,6 @@ var (
// Noop is the no compression pool
Noop NoopPool

// BufReaderPool is bufio.Reader pool
BufReaderPool = &BufioReaderPool{
pool: sync.Pool{
New: func() interface{} { return bufio.NewReader(nil) },
},
}

// BytesBufferPool is a bytes buffer used for lines decompressed.
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })
Expand Down Expand Up @@ -117,21 +110,32 @@ type GzipPool struct {
level int
}

// Gzip needs buffering to read efficiently.
// We need to be able to see the underlying gzip.Reader to Reset it.
type gzipBufferedReader struct {
*bufio.Reader
gzipReader *gzip.Reader
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *GzipPool) GetReader(src io.Reader) io.Reader {
func (pool *GzipPool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*gzip.Reader)
err := reader.Reset(src)
reader := r.(*gzipBufferedReader)
err := reader.gzipReader.Reset(src)
if err != nil {
panic(err)
return nil, err
}
return reader
reader.Reader.Reset(reader.gzipReader)
return reader, nil
}
reader, err := gzip.NewReader(src)
gzipReader, err := gzip.NewReader(src)
if err != nil {
panic(err)
return nil, err
}
return reader
return &gzipBufferedReader{
gzipReader: gzipReader,
Reader: bufio.NewReaderSize(gzipReader, 4*1024),
}, nil
}

// PutReader places back in the pool a CompressionReader
Expand Down Expand Up @@ -171,16 +175,16 @@ type FlatePool struct {
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *FlatePool) GetReader(src io.Reader) io.Reader {
func (pool *FlatePool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(flate.Resetter)
err := reader.Reset(src, nil)
if err != nil {
panic(err)
}
return reader.(io.Reader)
return reader.(io.Reader), nil
}
return flate.NewReader(src)
return flate.NewReader(src), nil
}

// PutReader places back in the pool a CompressionReader
Expand Down Expand Up @@ -219,21 +223,21 @@ type ZstdPool struct {
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *ZstdPool) GetReader(src io.Reader) io.Reader {
func (pool *ZstdPool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*zstd.Decoder)
err := reader.Reset(src)
if err != nil {
panic(err)
return nil, err
}
return reader
return reader, nil
}
reader, err := zstd.NewReader(src)
if err != nil {
panic(err)
return nil, err
}
runtime.SetFinalizer(reader, (*zstd.Decoder).Close)
return reader
return reader, nil
}

// PutReader places back in the pool a CompressionReader
Expand Down Expand Up @@ -267,16 +271,27 @@ type LZ4Pool struct {
bufferSize uint32 // available values: 1<<16 (64k), 1<<18 (256k), 1<<20 (1M), 1<<22 (4M). Defaults to 4MB, if not set.
}

// We need to be able to see the underlying lz4.Reader to Reset it.
type lz4BufferedReader struct {
*bufio.Reader
lz4Reader *lz4.Reader
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *LZ4Pool) GetReader(src io.Reader) io.Reader {
var r *lz4.Reader
func (pool *LZ4Pool) GetReader(src io.Reader) (io.Reader, error) {
var r *lz4BufferedReader
if pooled := pool.readers.Get(); pooled != nil {
r = pooled.(*lz4.Reader)
r.Reset(src)
r = pooled.(*lz4BufferedReader)
r.lz4Reader.Reset(src)
r.Reader.Reset(r.lz4Reader)
} else {
r = lz4.NewReader(src)
lz4Reader := lz4.NewReader(src)
r = &lz4BufferedReader{
lz4Reader: lz4Reader,
Reader: bufio.NewReaderSize(lz4Reader, 4*1024),
}
}
return r
return r, nil
}

// PutReader places back in the pool a CompressionReader
Expand Down Expand Up @@ -315,13 +330,13 @@ type SnappyPool struct {
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *SnappyPool) GetReader(src io.Reader) io.Reader {
func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*snappy.Reader)
reader.Reset(src)
return reader
return reader, nil
}
return snappy.NewReader(src)
return snappy.NewReader(src), nil
}

// PutReader places back in the pool a CompressionReader
Expand All @@ -347,8 +362,8 @@ func (pool *SnappyPool) PutWriter(writer io.WriteCloser) {
type NoopPool struct{}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *NoopPool) GetReader(src io.Reader) io.Reader {
return src
func (pool *NoopPool) GetReader(src io.Reader) (io.Reader, error) {
return src, nil
}

// PutReader places back in the pool a CompressionReader
Expand All @@ -367,23 +382,3 @@ func (pool *NoopPool) GetWriter(dst io.Writer) io.WriteCloser {

// PutWriter places back in the pool a CompressionWriter
func (pool *NoopPool) PutWriter(writer io.WriteCloser) {}

// BufioReaderPool is a bufio reader that uses sync.Pool.
type BufioReaderPool struct {
pool sync.Pool
}

// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool.
func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader {
buf := bufPool.pool.Get().(*bufio.Reader)
if buf == nil {
return bufio.NewReaderSize(r, 4*1024)
}
buf.Reset(r)
return buf
}

// Put puts the bufio.Reader back into the pool.
func (bufPool *BufioReaderPool) Put(b *bufio.Reader) {
bufPool.pool.Put(b)
}
3 changes: 2 additions & 1 deletion pkg/chunkenc/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func TestPool(t *testing.T) {
require.NoError(t, w.Close())

require.True(t, buf.Len() != 0, enc)
r := rpool.GetReader(bytes.NewBuffer(buf.Bytes()))
r, err := rpool.GetReader(bytes.NewBuffer(buf.Bytes()))
require.NoError(t, err)
defer rpool.PutReader(r)
n, err := r.Read(res)
if err != nil {
Expand Down
Loading