diff --git a/pkg/chunkenc/decompression/context.go b/pkg/chunkenc/decompression/context.go new file mode 100644 index 0000000000000..e5221c952f730 --- /dev/null +++ b/pkg/chunkenc/decompression/context.go @@ -0,0 +1,42 @@ +package decompression + +import ( + "context" + "time" +) + +type ctxKeyType string + +const ctxKey ctxKeyType = "decompression" + +// Stats is decompression statistic +type Stats struct { + TimeDecompress time.Duration // Time spent decompressing chunks + TimeFiltering time.Duration // Time spent filtering lines + BytesDecompressed int64 // Total bytes decompressed data size + BytesCompressed int64 // Total bytes compressed read + FetchedChunks int64 // Total number of chunks fetched. +} + +// NewContext creates a new decompression context +func NewContext(ctx context.Context) context.Context { + return context.WithValue(ctx, ctxKey, &Stats{}) +} + +// GetStats returns decompression statistics from a context. +func GetStats(ctx context.Context) Stats { + d, ok := ctx.Value(ctxKey).(*Stats) + if !ok { + return Stats{} + } + return *d +} + +// Mutate mutates the current context statistic using a mutator function +func Mutate(ctx context.Context, mutator func(m *Stats)) { + d, ok := ctx.Value(ctxKey).(*Stats) + if !ok { + return + } + mutator(d) +} diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index f956ac63b8b82..2b8fcb5b2e315 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -1,6 +1,7 @@ package chunkenc import ( + "context" "sort" "time" @@ -67,7 +68,7 @@ func (c *dumbChunk) Utilization() float64 { // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). -func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) { +func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) { i := sort.Search(len(c.entries), func(i int) bool { return !from.After(c.entries[i].Timestamp) }) diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 3644a15a2e93f..e785e153d43f8 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -1,6 +1,7 @@ package chunkenc import ( + "context" "errors" "fmt" "strings" @@ -95,7 +96,7 @@ type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool Append(*logproto.Entry) error - Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) + Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) Size() int Bytes() ([]byte, error) Blocks() int diff --git a/pkg/chunkenc/lazy_chunk.go b/pkg/chunkenc/lazy_chunk.go index 716cc6ea13ae4..2a0bbb852b083 100644 --- a/pkg/chunkenc/lazy_chunk.go +++ b/pkg/chunkenc/lazy_chunk.go @@ -22,7 +22,7 @@ func (c *LazyChunk) Iterator(ctx context.Context, from, through time.Time, direc // If the chunk is already loaded, then use that. if c.Chunk.Data != nil { lokiChunk := c.Chunk.Data.(*Facade).LokiChunk() - return lokiChunk.Iterator(from, through, direction, filter) + return lokiChunk.Iterator(ctx, from, through, direction, filter) } return nil, errors.New("chunk is not loaded") diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index aaee0a3645562..328ec9b323d05 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -3,6 +3,7 @@ package chunkenc import ( "bufio" "bytes" + "context" "encoding/binary" "fmt" "hash" @@ -12,6 +13,7 @@ import ( "github.com/pkg/errors" + "github.com/grafana/loki/pkg/chunkenc/decompression" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" @@ -464,13 +466,13 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) { } // Iterator implements Chunk. -func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) { +func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) { mint, maxt := mintT.UnixNano(), maxtT.UnixNano() its := make([]iter.EntryIterator, 0, len(c.blocks)+1) for _, b := range c.blocks { if maxt > b.mint && b.maxt > mint { - its = append(its, b.iterator(c.readers, filter)) + its = append(its, b.iterator(ctx, c.readers, filter)) } } @@ -491,11 +493,11 @@ func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction return iter.NewEntryIteratorBackward(iterForward) } -func (b block) iterator(pool ReaderPool, filter logql.Filter) iter.EntryIterator { +func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.Filter) iter.EntryIterator { if len(b.b) == 0 { return emptyIterator } - return newBufferedIterator(pool, b.b, filter) + return newBufferedIterator(ctx, pool, b.b, filter) } func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator { @@ -556,7 +558,11 @@ func (li *listIterator) Close() error { return nil } func (li *listIterator) Labels() string { return "" } type bufferedIterator struct { - origBytes []byte + origBytes []byte + rootCtx context.Context + timeDecompress time.Duration + timeFiltering time.Duration + bytesDecompressed int64 bufReader *bufio.Reader reader io.Reader @@ -574,8 +580,9 @@ type bufferedIterator struct { filter logql.Filter } -func newBufferedIterator(pool ReaderPool, b []byte, filter logql.Filter) *bufferedIterator { +func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, filter logql.Filter) *bufferedIterator { return &bufferedIterator{ + rootCtx: ctx, origBytes: b, reader: nil, // will be initialized later bufReader: nil, // will be initialized later @@ -593,14 +600,21 @@ func (si *bufferedIterator) Next() bool { } for { + start := time.Now() ts, line, ok := si.moveNext() + si.timeDecompress += time.Since(start) if !ok { si.Close() return false } + // we decode always the line length and ts as varint + si.bytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64 + start = time.Now() if si.filter != nil && !si.filter(line) { + si.timeFiltering += time.Since(start) continue } + si.timeFiltering += time.Since(start) si.cur.Line = string(line) si.cur.Timestamp = time.Unix(0, ts) return true @@ -669,19 +683,28 @@ func (si *bufferedIterator) Error() error { return si.err } func (si *bufferedIterator) Close() error { if !si.closed { si.closed = true - si.pool.PutReader(si.reader) - BufReaderPool.Put(si.bufReader) - if si.buf != nil { - BytesBufferPool.Put(si.buf) - } - si.origBytes = nil - si.bufReader = nil - si.buf = nil - si.decBuf = nil - si.reader = nil - return si.err + si.close() } return si.err } +func (si *bufferedIterator) close() { + decompression.Mutate(si.rootCtx, func(current *decompression.Stats) { + current.TimeDecompress += si.timeDecompress + current.TimeFiltering += si.timeFiltering + current.BytesDecompressed += si.bytesDecompressed + current.BytesCompressed += int64(len(si.origBytes)) + }) + si.pool.PutReader(si.reader) + BufReaderPool.Put(si.bufReader) + if si.buf != nil { + BytesBufferPool.Put(si.buf) + } + si.origBytes = nil + si.bufReader = nil + si.buf = nil + si.decBuf = nil + si.reader = nil +} + func (si *bufferedIterator) Labels() string { return "" } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 23a708ae8769b..4bd2fe9acaf36 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -2,6 +2,7 @@ package chunkenc import ( "bytes" + "context" "fmt" "math" "math/rand" @@ -89,7 +90,7 @@ func TestBlock(t *testing.T) { } } - it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) require.NoError(t, err) idx := 0 @@ -104,7 +105,7 @@ func TestBlock(t *testing.T) { require.Equal(t, len(cases), idx) t.Run("bounded-iteration", func(t *testing.T) { - it, err := chk.Iterator(time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, nil) + it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, nil) require.NoError(t, err) idx := 2 @@ -137,7 +138,7 @@ func TestReadFormatV1(t *testing.T) { t.Fatal(err) } - it, err := r.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) + it, err := r.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) if err != nil { t.Fatal(err) } @@ -164,7 +165,7 @@ func TestRoundtripV2(t *testing.T) { assertLines := func(c *MemChunk) { require.Equal(t, enc, c.Encoding()) - it, err := c.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) if err != nil { t.Fatal(err) } @@ -226,7 +227,7 @@ func TestSerialization(t *testing.T) { bc, err := NewByteChunk(byt) require.NoError(t, err) - it, err := bc.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) + it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) require.NoError(t, err) for i := 0; i < numSamples; i++ { require.True(t, it.Next()) @@ -271,7 +272,7 @@ func TestChunkFilling(t *testing.T) { require.Equal(t, int64(lines), i) - it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil) + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil) require.NoError(t, err) i = 0 for it.Next() { @@ -433,9 +434,7 @@ func BenchmarkRead(b *testing.B) { for n := 0; n < b.N; n++ { for _, c := range chunks { // use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory - iterator, err := c.Iterator(time.Unix(0, 0), time.Now(), logproto.FORWARD, func(line []byte) bool { - return false - }) + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nil) if err != nil { panic(err) } @@ -462,7 +461,7 @@ func TestGenerateDataSize(t *testing.T) { bytesRead := uint64(0) for _, c := range chunks { // use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory - iterator, err := c.Iterator(time.Unix(0, 0), time.Now(), logproto.FORWARD, func(line []byte) bool { + iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, func(line []byte) bool { return true // return all }) if err != nil { diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index e7ab04b6ed9d9..0f915a4ed073c 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -1,6 +1,7 @@ package ingester import ( + "context" "fmt" "math/rand" "testing" @@ -60,7 +61,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil) + iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) _ = iter.Close() @@ -69,7 +70,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil) + iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) _ = iter.Close() diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index ff4f0e3b2266a..c54c10e9b3054 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -250,7 +250,7 @@ func (s *testStore) getChunksForUser(userID string) []chunk.Chunk { } func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream { - it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, nil) + it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, nil) require.NoError(t, err) stream := &logproto.Stream{ diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 634ba5f0ba54f..8296faed74718 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -190,7 +190,7 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie if err != nil { return err } - iters, err := i.lookupStreams(req, expr.Matchers(), filter) + iters, err := i.lookupStreams(queryServer.Context(), req, expr.Matchers(), filter) if err != nil { return err } @@ -221,7 +221,7 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro }, nil } -func (i *instance) lookupStreams(req *logproto.QueryRequest, matchers []*labels.Matcher, filter logql.Filter) ([]iter.EntryIterator, error) { +func (i *instance) lookupStreams(ctx context.Context, req *logproto.QueryRequest, matchers []*labels.Matcher, filter logql.Filter) ([]iter.EntryIterator, error) { i.streamsMtx.RLock() defer i.streamsMtx.RUnlock() @@ -240,7 +240,7 @@ outer: continue outer } } - iter, err := stream.Iterator(req.Start, req.End, req.Direction, filter) + iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, filter) if err != nil { return nil, err } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index fa8a8a26beecf..df1405daf5ae1 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -225,10 +225,10 @@ func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp t } // Returns an iterator. -func (s *stream) Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) { +func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) { iterators := make([]iter.EntryIterator, 0, len(s.chunks)) for _, c := range s.chunks { - itr, err := c.chunk.Iterator(from, through, direction, filter) + itr, err := c.chunk.Iterator(ctx, from, through, direction, filter) if err != nil { return nil, err } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 3febb271eada9..f3855eec9fa24 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -1,6 +1,7 @@ package ingester import ( + "context" "fmt" "math/rand" "testing" @@ -40,7 +41,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(chunks*entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil) + iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil) require.NotNil(t, iter) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) @@ -50,7 +51,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil) + iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil) require.NotNil(t, iter) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index dbf95c89453ad..1ac61f31284cb 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -85,6 +85,7 @@ func TestTransferOut(t *testing.T) { // Get all the lines back and make sure the blocks transferred successfully for _, stream := range ing2.instances["test"].streams { it, err := stream.Iterator( + context.TODO(), time.Unix(0, 0), time.Unix(10, 0), logproto.FORWARD, diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index feab29db40fd7..693ef94583bfd 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -7,6 +7,9 @@ import ( "sort" "time" + "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/chunkenc/decompression" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" @@ -144,6 +147,8 @@ func (ng *Engine) NewInstantQuery( } func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) { + log, ctx := spanlogger.New(ctx, "Engine.exec") + defer log.Finish() ctx, cancel := context.WithTimeout(ctx, ng.timeout) defer cancel() @@ -159,6 +164,19 @@ func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) { return nil, err } + ctx = decompression.NewContext(ctx) + start := time.Now() + defer func() { + stats := decompression.GetStats(ctx) + level.Debug(log).Log( + "Time Decompressing (ms)", stats.TimeDecompress.Nanoseconds()/int64(time.Millisecond), + "Time Filtering (ms)", stats.TimeFiltering.Nanoseconds()/int64(time.Millisecond), + "Fetched chunks", stats.FetchedChunks, + "Total bytes compressed (MB)", stats.BytesCompressed/1024/1024, + "Total bytes uncompressed (MB)", stats.BytesDecompressed/1024/1024, + "Total exec time (ms)", time.Since(start).Nanoseconds()/int64(time.Millisecond), + ) + }() switch e := expr.(type) { case SampleExpr: if err := ng.setupIterators(ctx, e, q); err != nil { diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index 5acf5ea25d8e8..348a514d3b84c 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -9,6 +9,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/chunkenc/decompression" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" @@ -375,7 +376,9 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { lastErr = err } } - + decompression.Mutate(ctx, func(m *decompression.Stats) { + m.FetchedChunks += int64(totalChunks) + }) return lastErr }