From e9acb2ac08630e9c63464a22355f93adf3d38514 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 9 Jan 2020 12:35:13 -0500 Subject: [PATCH] pre-fetch the next chunk in parallel to processing the current chunk such that we aren't waiting for fetches from GCS and spend all our CPU time decompressing and processing Signed-off-by: Edward Welch --- pkg/storage/iterator.go | 54 +++++++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index 348a514d3b84..80ad8573361c 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -6,15 +6,17 @@ import ( "time" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/chunkenc/decompression" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" ) // lazyChunks is a slice of lazy chunks that can ordered by chunk boundaries @@ -71,14 +73,18 @@ type batchChunkIterator struct { lastOverlapping []*chunkenc.LazyChunk ctx context.Context + cancel context.CancelFunc matchers []*labels.Matcher filter logql.Filter req *logproto.QueryRequest + next chan *struct { + iter iter.EntryIterator + err error + } } // newBatchChunkIterator creates a new batch iterator with the given batchSize. func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, batchSize int, matchers []*labels.Matcher, filter logql.Filter, req *logproto.QueryRequest) *batchChunkIterator { - // __name__ is not something we filter by because it's a constant in loki and only used for upstream compatibility. // Therefore remove it for i := range matchers { @@ -87,19 +93,45 @@ func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, ba break } } - + ctx, cancel := context.WithCancel(ctx) res := &batchChunkIterator{ batchSize: batchSize, matchers: matchers, filter: filter, req: req, ctx: ctx, + cancel: cancel, chunks: lazyChunks{direction: req.Direction, chunks: chunks}, + next: make(chan *struct { + iter iter.EntryIterator + err error + }), } sort.Sort(res.chunks) + go func() { + for { + if res.chunks.Len() == 0 { + close(res.next) + return + } + next, err := res.nextBatch() + select { + case <-ctx.Done(): + close(res.next) + err = next.Close() + if err != nil { + level.Error(util.Logger).Log("msg", "Failed to close the pre-fetched iterator when pre-fetching was canceled", "err", err) + } + return + case res.next <- &struct { + iter iter.EntryIterator + err error + }{next, err}: + } + } + }() return res } - func (it *batchChunkIterator) Next() bool { var err error // for loop to avoid recursion @@ -107,15 +139,16 @@ func (it *batchChunkIterator) Next() bool { if it.curr != nil && it.curr.Next() { return true } - if it.chunks.Len() == 0 { - return false - } // close previous iterator if it.curr != nil { it.err = it.curr.Close() } - it.curr, err = it.nextBatch() - if err != nil { + next := <-it.next + if next == nil { + return false + } + it.curr = next.iter + if next.err != nil { it.err = err return false } @@ -231,6 +264,7 @@ func (it *batchChunkIterator) Error() error { } func (it *batchChunkIterator) Close() error { + it.cancel() if it.curr != nil { return it.curr.Close() }