Skip to content

Commit

Permalink
pre-fetch the next chunk in parallel to processing the current chunk …
Browse files Browse the repository at this point in the history
…such that we aren't waiting for fetches from GCS and spend all our CPU time decompressing and processing

Signed-off-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean committed Jan 9, 2020
1 parent fa63c09 commit e9acb2a
Showing 1 changed file with 44 additions and 10 deletions.
54 changes: 44 additions & 10 deletions pkg/storage/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)

// lazyChunks is a slice of lazy chunks that can ordered by chunk boundaries
Expand Down Expand Up @@ -71,14 +73,18 @@ type batchChunkIterator struct {
lastOverlapping []*chunkenc.LazyChunk

ctx context.Context
cancel context.CancelFunc
matchers []*labels.Matcher
filter logql.Filter
req *logproto.QueryRequest
next chan *struct {
iter iter.EntryIterator
err error
}
}

// newBatchChunkIterator creates a new batch iterator with the given batchSize.
func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, batchSize int, matchers []*labels.Matcher, filter logql.Filter, req *logproto.QueryRequest) *batchChunkIterator {

// __name__ is not something we filter by because it's a constant in loki and only used for upstream compatibility.
// Therefore remove it
for i := range matchers {
Expand All @@ -87,35 +93,62 @@ func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, ba
break
}
}

ctx, cancel := context.WithCancel(ctx)
res := &batchChunkIterator{
batchSize: batchSize,
matchers: matchers,
filter: filter,
req: req,
ctx: ctx,
cancel: cancel,
chunks: lazyChunks{direction: req.Direction, chunks: chunks},
next: make(chan *struct {
iter iter.EntryIterator
err error
}),
}
sort.Sort(res.chunks)
go func() {
for {
if res.chunks.Len() == 0 {
close(res.next)
return
}
next, err := res.nextBatch()
select {
case <-ctx.Done():
close(res.next)
err = next.Close()
if err != nil {
level.Error(util.Logger).Log("msg", "Failed to close the pre-fetched iterator when pre-fetching was canceled", "err", err)
}
return
case res.next <- &struct {
iter iter.EntryIterator
err error
}{next, err}:
}
}
}()
return res
}

func (it *batchChunkIterator) Next() bool {
var err error
// for loop to avoid recursion
for {
if it.curr != nil && it.curr.Next() {
return true
}
if it.chunks.Len() == 0 {
return false
}
// close previous iterator
if it.curr != nil {
it.err = it.curr.Close()
}
it.curr, err = it.nextBatch()
if err != nil {
next := <-it.next
if next == nil {
return false
}
it.curr = next.iter
if next.err != nil {
it.err = err
return false
}
Expand Down Expand Up @@ -231,6 +264,7 @@ func (it *batchChunkIterator) Error() error {
}

func (it *batchChunkIterator) Close() error {
it.cancel()
if it.curr != nil {
return it.curr.Close()
}
Expand Down

0 comments on commit e9acb2a

Please sign in to comment.