Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bloom-gw): Add metrics.go style log line to bloom gateway FilterChunks call #12354

Merged
merged 9 commits into from
Mar 26, 2024
46 changes: 27 additions & 19 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/queue"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
Expand Down Expand Up @@ -206,34 +205,45 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
log.With(g.logger, "tenant", tenantID),
"bloomgateway.FilterChunkRefs",
)
defer sp.Finish()

stats, ctx := ContextWithEmptyStats(ctx)
defer func() {
level.Info(sp).Log(stats.KVArgs()...)
sp.Finish()
}()

// start time == end time --> empty response
if req.From.Equal(req.Through) {
stats.Status = labelSuccess
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
}, nil
}

// start time > end time --> error response
if req.Through.Before(req.From) {
stats.Status = labelFailure
return nil, errors.New("from time must not be after through time")
}

filters := v1.ExtractTestableLineFilters(req.Plan.AST)
stats.NumFilters = len(filters)
g.metrics.receivedFilters.Observe(float64(len(filters)))

// Shortcut if request does not contain filters
if len(filters) == 0 {
stats.Status = labelSuccess
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
}

seriesByDay := partitionRequest(req)
stats.NumTasks = len(seriesByDay)

// no tasks --> empty response
if len(seriesByDay) == 0 {
stats.Status = labelSuccess
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
}, nil
Expand All @@ -255,15 +265,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk

// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))

level.Debug(sp).Log(
"msg", "created task for day",
"task", task.ID,
"day", seriesForDay.day,
"interval", seriesForDay.interval.String(),
"nSeries", len(seriesForDay.series),
"filters", JoinFunc(filters, ";", func(e syntax.LineFilterExpr) string { return e.String() }),
)
tasks = append(tasks, task)
}

Expand All @@ -285,13 +286,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// When enqueuing, we also add the task to the pending tasks
_ = g.pendingTasks.Inc()
}); err != nil {
stats.Status = labelFailure
return nil, errors.Wrap(err, "failed to enqueue task")
}
// TODO(owen-d): use `concurrency` lib, bound parallelism
go g.consumeTask(ctx, task, tasksCh)
}

sp.Log("enqueue_duration", time.Since(queueStart).String())
sp.Log("msg", "enqueued tasks", "duration", time.Since(queueStart).String())

remaining := len(tasks)

Expand All @@ -305,10 +307,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
for remaining > 0 {
select {
case <-ctx.Done():
stats.Status = "cancel"
return nil, errors.Wrap(ctx.Err(), "request failed")
case task := <-tasksCh:
level.Info(sp).Log("msg", "task done", "task", task.ID, "err", task.Err())
if task.Err() != nil {
stats.Status = labelFailure
return nil, errors.Wrap(task.Err(), "request failed")
}
responses = append(responses, task.responses)
Expand All @@ -318,7 +322,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk

sp.Log("msg", "received all responses")

start := time.Now()
filtered := filterChunkRefs(req, responses)
duration := time.Since(start)
stats.AddPostProcessingTime(duration)

// free up the responses
for _, resp := range responses {
Expand All @@ -335,13 +342,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
g.metrics.requestedChunks.Observe(float64(preFilterChunks))
g.metrics.filteredChunks.Observe(float64(preFilterChunks - postFilterChunks))

level.Info(sp).Log(
"msg", "return filtered chunk refs",
"requested_series", preFilterSeries,
"filtered_series", preFilterSeries-postFilterSeries,
"requested_chunks", preFilterChunks,
"filtered_chunks", preFilterChunks-postFilterChunks,
)
stats.Status = "success"
stats.SeriesRequested = preFilterSeries
stats.SeriesFiltered = preFilterSeries - postFilterSeries
stats.ChunksRequested = preFilterChunks
stats.ChunksFiltered = preFilterChunks - postFilterChunks

sp.Log("msg", "return filtered chunk refs")

return &logproto.FilterChunkRefResponse{ChunkRefs: filtered}, nil
}

Expand Down
47 changes: 31 additions & 16 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.M
"msg", "process tasks with bounds",
"tenant", tenant,
"tasks", len(tasks),
"bounds", JoinFunc(bounds, ",", func(e v1.FingerprintBounds) string { return e.String() }),
"bounds", len(bounds),
)

for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) {
Expand Down Expand Up @@ -73,23 +73,30 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
Interval: interval,
Keyspace: v1.NewBounds(minFpRange.Min, maxFpRange.Max),
}

start := time.Now()
metas, err := p.store.FetchMetas(ctx, metaSearch)
duration := time.Since(start)
level.Debug(p.logger).Log("msg", "fetched metas", "count", len(metas), "duration", duration, "err", err)

for _, t := range tasks {
FromContext(t.ctx).AddMetasFetchTime(duration)
}

if err != nil {
return err
}

blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces)
level.Info(p.logger).Log("msg", "blocks for metas", "num_metas", len(metas), "num_blocks", len(blocksRefs))
return p.processBlocks(ctx, partitionTasks(tasks, blocksRefs))
}

func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) error {
data := partitionTasks(tasks, blocksRefs)

refs := make([]bloomshipper.BlockRef, 0, len(data))
for _, block := range data {
refs = append(refs, block.ref)
}

start := time.Now()
start = time.Now()
bqs, err := p.store.FetchBlocks(
ctx,
refs,
Expand All @@ -101,12 +108,21 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er
// the underlying bloom []byte outside of iteration
bloomshipper.WithPool(true),
)
level.Debug(p.logger).Log("msg", "fetch blocks", "count", len(bqs), "duration", time.Since(start), "err", err)
duration = time.Since(start)
level.Debug(p.logger).Log("msg", "fetched blocks", "count", len(refs), "duration", duration, "err", err)

for _, t := range tasks {
FromContext(t.ctx).AddBlocksFetchTime(duration)
}

if err != nil {
return err
}

return p.processBlocks(ctx, bqs, data)
}

func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.CloseableBlockQuerier, data []blockWithTasks) error {
defer func() {
for i := range bqs {
if bqs[i] == nil {
Expand All @@ -124,13 +140,6 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er
}

block := data[i]
level.Debug(p.logger).Log(
"msg", "process block with tasks",
"job", i+1,
"of_jobs", len(bqs),
"block", block.ref,
"num_tasks", len(block.tasks),
)

if !block.ref.Bounds.Equal(bq.Bounds) {
return errors.Errorf("block and querier bounds differ: %s vs %s", block.ref.Bounds, bq.Bounds)
Expand Down Expand Up @@ -178,10 +187,16 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie

start := time.Now()
err = fq.Run()
duration := time.Since(start)

if err != nil {
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelFailure).Observe(time.Since(start).Seconds())
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelFailure).Observe(duration.Seconds())
} else {
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelSuccess).Observe(time.Since(start).Seconds())
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelSuccess).Observe(duration.Seconds())
}

for _, task := range tasks {
FromContext(task.ctx).AddProcessingTime(duration)
}

return err
Expand Down
90 changes: 90 additions & 0 deletions pkg/bloomgateway/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package bloomgateway

import (
"context"
"time"

"go.uber.org/atomic"
)

type Stats struct {
Status string
NumTasks, NumFilters int
ChunksRequested, ChunksFiltered, SeriesRequested, SeriesFiltered int
QueueTime, MetasFetchTime, BlocksFetchTime, ProcessingTime, PostProcessingTime atomic.Duration
}

type statsKey int

var ctxKey = statsKey(0)

// ContextWithEmptyStats returns a context with empty stats.
func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context) {
stats := &Stats{Status: "unknown"}
ctx = context.WithValue(ctx, ctxKey, stats)
return stats, ctx
}

// FromContext gets the Stats out of the Context. Returns nil if stats have not
// been initialised in the context.
func FromContext(ctx context.Context) *Stats {
o := ctx.Value(ctxKey)
if o == nil {
return nil
}
return o.(*Stats)
}

func (s *Stats) KVArgs() []any {
if s == nil {
return []any{}
}
return []any{
"status", s.Status,
"tasks", s.NumTasks,
"series_requested", s.SeriesRequested,
"series_filtered", s.SeriesFiltered,
"chunks_requested", s.ChunksRequested,
"chunks_filtered", s.ChunksFiltered,
"queue_time", s.QueueTime.Load(),
"metas_fetch_time", s.MetasFetchTime.Load(),
"blocks_fetch_time", s.BlocksFetchTime.Load(),
"processing_time", s.ProcessingTime.Load(),
"post_processing_time", s.PostProcessingTime.Load(),
}
}

func (s *Stats) AddQueueTime(t time.Duration) {
if s == nil {
return
}
s.QueueTime.Add(t)
}

func (s *Stats) AddMetasFetchTime(t time.Duration) {
if s == nil {
return
}
s.MetasFetchTime.Add(t)
}

func (s *Stats) AddBlocksFetchTime(t time.Duration) {
if s == nil {
return
}
s.BlocksFetchTime.Add(t)
}

func (s *Stats) AddProcessingTime(t time.Duration) {
if s == nil {
return
}
s.ProcessingTime.Add(t)
}

func (s *Stats) AddPostProcessingTime(t time.Duration) {
if s == nil {
return
}
s.PostProcessingTime.Add(t)
}
1 change: 1 addition & 0 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (w *worker) running(_ context.Context) error {
level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID)
_ = w.pending.Dec()
w.metrics.queueDuration.WithLabelValues(w.id).Observe(time.Since(task.enqueueTime).Seconds())
FromContext(task.ctx).AddQueueTime(time.Since(task.enqueueTime))
tasks = append(tasks, task)

first, last := getFirstLast(task.series)
Expand Down
Loading