Skip to content

Commit

Permalink
Add decompression tracing instrumentation. (#1445)
Browse files Browse the repository at this point in the history
* Add decompression tracing instrumentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix time conversion.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Use span logger so we also get a log line.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve reusability and add fetched chunks.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Dec 20, 2019
1 parent 768bf91 commit fb76416
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 41 deletions.
42 changes: 42 additions & 0 deletions pkg/chunkenc/decompression/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package decompression

import (
"context"
"time"
)

type ctxKeyType string

const ctxKey ctxKeyType = "decompression"

// Stats is decompression statistic
type Stats struct {
TimeDecompress time.Duration // Time spent decompressing chunks
TimeFiltering time.Duration // Time spent filtering lines
BytesDecompressed int64 // Total bytes decompressed data size
BytesCompressed int64 // Total bytes compressed read
FetchedChunks int64 // Total number of chunks fetched.
}

// NewContext creates a new decompression context
func NewContext(ctx context.Context) context.Context {
return context.WithValue(ctx, ctxKey, &Stats{})
}

// GetStats returns decompression statistics from a context.
func GetStats(ctx context.Context) Stats {
d, ok := ctx.Value(ctxKey).(*Stats)
if !ok {
return Stats{}
}
return *d
}

// Mutate mutates the current context statistic using a mutator function
func Mutate(ctx context.Context, mutator func(m *Stats)) {
d, ok := ctx.Value(ctxKey).(*Stats)
if !ok {
return
}
mutator(d)
}
3 changes: 2 additions & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chunkenc

import (
"context"
"sort"
"time"

Expand Down Expand Up @@ -67,7 +68,7 @@ func (c *dumbChunk) Utilization() float64 {

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) {
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chunkenc

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -95,7 +96,7 @@ type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error)
Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error)
Size() int
Bytes() ([]byte, error)
Blocks() int
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/lazy_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (c *LazyChunk) Iterator(ctx context.Context, from, through time.Time, direc
// If the chunk is already loaded, then use that.
if c.Chunk.Data != nil {
lokiChunk := c.Chunk.Data.(*Facade).LokiChunk()
return lokiChunk.Iterator(from, through, direction, filter)
return lokiChunk.Iterator(ctx, from, through, direction, filter)
}

return nil, errors.New("chunk is not loaded")
Expand Down
57 changes: 40 additions & 17 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chunkenc
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"fmt"
"hash"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
Expand Down Expand Up @@ -464,13 +466,13 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
}

// Iterator implements Chunk.
func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)

for _, b := range c.blocks {
if maxt > b.mint && b.maxt > mint {
its = append(its, b.iterator(c.readers, filter))
its = append(its, b.iterator(ctx, c.readers, filter))
}
}

Expand All @@ -491,11 +493,11 @@ func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction
return iter.NewEntryIteratorBackward(iterForward)
}

func (b block) iterator(pool ReaderPool, filter logql.Filter) iter.EntryIterator {
func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.Filter) iter.EntryIterator {
if len(b.b) == 0 {
return emptyIterator
}
return newBufferedIterator(pool, b.b, filter)
return newBufferedIterator(ctx, pool, b.b, filter)
}

func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator {
Expand Down Expand Up @@ -556,7 +558,11 @@ func (li *listIterator) Close() error { return nil }
func (li *listIterator) Labels() string { return "" }

type bufferedIterator struct {
origBytes []byte
origBytes []byte
rootCtx context.Context
timeDecompress time.Duration
timeFiltering time.Duration
bytesDecompressed int64

bufReader *bufio.Reader
reader io.Reader
Expand All @@ -574,8 +580,9 @@ type bufferedIterator struct {
filter logql.Filter
}

func newBufferedIterator(pool ReaderPool, b []byte, filter logql.Filter) *bufferedIterator {
func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, filter logql.Filter) *bufferedIterator {
return &bufferedIterator{
rootCtx: ctx,
origBytes: b,
reader: nil, // will be initialized later
bufReader: nil, // will be initialized later
Expand All @@ -593,14 +600,21 @@ func (si *bufferedIterator) Next() bool {
}

for {
start := time.Now()
ts, line, ok := si.moveNext()
si.timeDecompress += time.Since(start)
if !ok {
si.Close()
return false
}
// we decode always the line length and ts as varint
si.bytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64
start = time.Now()
if si.filter != nil && !si.filter(line) {
si.timeFiltering += time.Since(start)
continue
}
si.timeFiltering += time.Since(start)
si.cur.Line = string(line)
si.cur.Timestamp = time.Unix(0, ts)
return true
Expand Down Expand Up @@ -669,19 +683,28 @@ func (si *bufferedIterator) Error() error { return si.err }
func (si *bufferedIterator) Close() error {
if !si.closed {
si.closed = true
si.pool.PutReader(si.reader)
BufReaderPool.Put(si.bufReader)
if si.buf != nil {
BytesBufferPool.Put(si.buf)
}
si.origBytes = nil
si.bufReader = nil
si.buf = nil
si.decBuf = nil
si.reader = nil
return si.err
si.close()
}
return si.err
}

func (si *bufferedIterator) close() {
decompression.Mutate(si.rootCtx, func(current *decompression.Stats) {
current.TimeDecompress += si.timeDecompress
current.TimeFiltering += si.timeFiltering
current.BytesDecompressed += si.bytesDecompressed
current.BytesCompressed += int64(len(si.origBytes))
})
si.pool.PutReader(si.reader)
BufReaderPool.Put(si.bufReader)
if si.buf != nil {
BytesBufferPool.Put(si.buf)
}
si.origBytes = nil
si.bufReader = nil
si.buf = nil
si.decBuf = nil
si.reader = nil
}

func (si *bufferedIterator) Labels() string { return "" }
19 changes: 9 additions & 10 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chunkenc

import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestBlock(t *testing.T) {
}
}

it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
require.NoError(t, err)

idx := 0
Expand All @@ -104,7 +105,7 @@ func TestBlock(t *testing.T) {
require.Equal(t, len(cases), idx)

t.Run("bounded-iteration", func(t *testing.T) {
it, err := chk.Iterator(time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, nil)
it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, nil)
require.NoError(t, err)

idx := 2
Expand Down Expand Up @@ -137,7 +138,7 @@ func TestReadFormatV1(t *testing.T) {
t.Fatal(err)
}

it, err := r.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
it, err := r.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -164,7 +165,7 @@ func TestRoundtripV2(t *testing.T) {

assertLines := func(c *MemChunk) {
require.Equal(t, enc, c.Encoding())
it, err := c.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -226,7 +227,7 @@ func TestSerialization(t *testing.T) {
bc, err := NewByteChunk(byt)
require.NoError(t, err)

it, err := bc.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
require.NoError(t, err)
for i := 0; i < numSamples; i++ {
require.True(t, it.Next())
Expand Down Expand Up @@ -271,7 +272,7 @@ func TestChunkFilling(t *testing.T) {

require.Equal(t, int64(lines), i)

it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil)
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil)
require.NoError(t, err)
i = 0
for it.Next() {
Expand Down Expand Up @@ -433,9 +434,7 @@ func BenchmarkRead(b *testing.B) {
for n := 0; n < b.N; n++ {
for _, c := range chunks {
// use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory
iterator, err := c.Iterator(time.Unix(0, 0), time.Now(), logproto.FORWARD, func(line []byte) bool {
return false
})
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nil)
if err != nil {
panic(err)
}
Expand All @@ -462,7 +461,7 @@ func TestGenerateDataSize(t *testing.T) {
bytesRead := uint64(0)
for _, c := range chunks {
// use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory
iterator, err := c.Iterator(time.Unix(0, 0), time.Now(), logproto.FORWARD, func(line []byte) bool {
iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, func(line []byte) bool {
return true // return all
})
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ingester/chunk_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"context"
"fmt"
"math/rand"
"testing"
Expand Down Expand Up @@ -60,7 +61,7 @@ func TestIterator(t *testing.T) {
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil)
iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil)
require.NoError(t, err)
testIteratorForward(t, iter, int64(from), int64(from+len))
_ = iter.Close()
Expand All @@ -69,7 +70,7 @@ func TestIterator(t *testing.T) {
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil)
iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil)
require.NoError(t, err)
testIteratorBackward(t, iter, int64(from), int64(from+len))
_ = iter.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (s *testStore) getChunksForUser(userID string) []chunk.Chunk {
}

func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream {
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, nil)
it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, nil)
require.NoError(t, err)

stream := &logproto.Stream{
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
if err != nil {
return err
}
iters, err := i.lookupStreams(req, expr.Matchers(), filter)
iters, err := i.lookupStreams(queryServer.Context(), req, expr.Matchers(), filter)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro
}, nil
}

func (i *instance) lookupStreams(req *logproto.QueryRequest, matchers []*labels.Matcher, filter logql.Filter) ([]iter.EntryIterator, error) {
func (i *instance) lookupStreams(ctx context.Context, req *logproto.QueryRequest, matchers []*labels.Matcher, filter logql.Filter) ([]iter.EntryIterator, error) {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()

Expand All @@ -240,7 +240,7 @@ outer:
continue outer
}
}
iter, err := stream.Iterator(req.Start, req.End, req.Direction, filter)
iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, filter)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,10 @@ func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp t
}

// Returns an iterator.
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
for _, c := range s.chunks {
itr, err := c.chunk.Iterator(from, through, direction, filter)
itr, err := c.chunk.Iterator(ctx, from, through, direction, filter)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit fb76416

Please sign in to comment.