From c1fada9af01fded6c4e08c5bd54917a8dad5d744 Mon Sep 17 00:00:00 2001 From: Dylan Guedes Date: Fri, 21 Jun 2024 15:24:39 -0300 Subject: [PATCH] refactor: Remove unnecessary spanlogger usage (#13255) Historically, we've been using `spanlogger` to enrich our spans. That isn't the right usage for it (as of now), as they'll be emitting log lines. Our current usage is causing: - Noisy/important log lines less visible - Unnecessary logging volume - Spread of the bad practice. With this PR I'm moving away from `spanlogger` all places where the log message is too bad or nonexistent. That's because these cases are indicating we don't care about that log line at all, and only about the data injected in the span. --- pkg/logql/engine.go | 4 +-- pkg/logql/metrics.go | 1 - pkg/logql/metrics_test.go | 2 +- pkg/logqlmodel/stats/context.go | 29 ++++++++------- pkg/querier/http.go | 35 +++++++++++-------- pkg/querier/querier.go | 17 +++++---- pkg/querier/queryrange/downstreamer.go | 5 +-- pkg/querier/queryrange/limits.go | 6 +--- .../queryrange/queryrangebase/query_range.go | 6 ++-- pkg/querier/queryrange/querysharding.go | 13 +++---- pkg/querier/queryrange/shard_resolver.go | 4 +-- pkg/querier/queryrange/stats.go | 4 ++- pkg/storage/async_store.go | 7 ++-- pkg/storage/chunk/cache/resultscache/cache.go | 11 ++---- .../chunk/client/util/parallel_chunk_fetch.go | 11 +++--- pkg/storage/stores/composite_store_entry.go | 7 +--- .../stores/series/series_index_store.go | 33 +++++++---------- .../stores/shipper/bloomshipper/client.go | 13 +++---- .../indexshipper/storage/cached_client.go | 28 ++++++++------- 19 files changed, 112 insertions(+), 124 deletions(-) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index fcbfcb450e683..8b46ed4d833fb 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -36,7 +36,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/httpreq" logutil "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/server" - "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -231,7 +230,6 @@ func (q *query) resultLength(res promql_parser.Value) int { func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "query.Exec") defer sp.Finish() - spLogger := spanlogger.FromContext(ctx) sp.LogKV( "type", GetRangeType(q.params), @@ -265,7 +263,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, q.resultLength(data)) - statResult.Log(level.Debug(spLogger)) + sp.LogKV(statResult.KVList()...) status, _ := server.ClientHTTPStatusAndError(err) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 052446c6b5b74..6f35f84b3a3ae 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -377,7 +377,6 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim "query", query, "query_hash", util.HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned) - level.Info(logger).Log(logValues...) execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index 44094e27f5d4b..577627a202036 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -92,8 +92,8 @@ func TestLogSlowQuery(t *testing.T) { func TestLogLabelsQuery(t *testing.T) { buf := bytes.NewBufferString("") - logger := log.NewLogfmtLogger(buf) tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter()) + logger := log.NewLogfmtLogger(buf) defer c.Close() opentracing.SetGlobalTracer(tr) sp := opentracing.StartSpan("") diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 18794fb137fe8..a0509be31f6d2 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -26,6 +26,7 @@ import ( "time" "github.com/dustin/go-humanize" + "github.com/go-kit/log" ) @@ -518,9 +519,12 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache { return stats } -// Log logs a query statistics result. -func (r Result) Log(log log.Logger) { - _ = log.Log( +func (r Result) Log(logger log.Logger) { + logger.Log(r.KVList()...) +} + +func (r Result) KVList() []any { + result := []any{ "Ingester.TotalReached", r.Ingester.TotalReached, "Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched, "Ingester.TotalBatches", r.Ingester.TotalBatches, @@ -549,13 +553,14 @@ func (r Result) Log(log log.Logger) { "Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)), "Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates, "Querier.QueryReferencedStructuredMetadata", r.Querier.Store.QueryReferencedStructured, - ) - r.Caches.Log(log) - r.Summary.Log(log) + } + + result = append(result, r.Caches.kvList()...) + return append(result, r.Summary.kvList()...) } -func (s Summary) Log(log log.Logger) { - _ = log.Log( +func (s Summary) kvList() []any { + return []any{ "Summary.BytesProcessedPerSecond", humanize.Bytes(uint64(s.BytesProcessedPerSecond)), "Summary.LinesProcessedPerSecond", s.LinesProcessedPerSecond, "Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)), @@ -563,11 +568,11 @@ func (s Summary) Log(log log.Logger) { "Summary.PostFilterLines", s.TotalPostFilterLines, "Summary.ExecTime", ConvertSecondsToNanoseconds(s.ExecTime), "Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime), - ) + } } -func (c Caches) Log(log log.Logger) { - _ = log.Log( +func (c Caches) kvList() []any { + return []any{ "Cache.Chunk.Requests", c.Chunk.Requests, "Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested, "Cache.Chunk.EntriesFound", c.Chunk.EntriesFound, @@ -620,5 +625,5 @@ func (c Caches) Log(log log.Logger) { "Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)), "Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)), "Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(), - ) + } } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 021cbbfe7ea42..302c1c42814b3 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -112,15 +112,16 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques resLength = len(resp.Values) } statResult := statsCtx.Result(time.Since(start), queueTime, resLength) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult) + logql.RecordLabelQueryMetrics(ctx, util_log.Logger, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult) return resp, err } @@ -266,15 +267,16 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ } statResult := statsCtx.Result(time.Since(start), queueTime, resLength) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult) + logql.RecordSeriesQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult) return resp, statResult, err } @@ -296,15 +298,16 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, 1) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordStatsQueryMetrics(ctx, log, req.Start, req.End, req.Query, strconv.Itoa(status), statResult) + logql.RecordStatsQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Query, strconv.Itoa(status), statResult) return resp, err } @@ -327,8 +330,9 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ statResult := statsCtx.Result(time.Since(start), queueTime, resLength) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { @@ -336,7 +340,7 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ } logql.RecordShardsQueryMetrics( - ctx, log, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult, + ctx, util_log.Logger, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult, ) return resp, err @@ -363,15 +367,16 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, 1) - log := spanlogger.FromContext(ctx) - statResult.Log(level.Debug(log)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV(statResult.KVList()...) + } status := 200 if err != nil { status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) + logql.RecordVolumeQueryMetrics(ctx, util_log.Logger, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) return resp, nil } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 905124fb7c304..0cd101cd6eebf 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -161,6 +161,7 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) + sp := opentracing.SpanFromContext(ctx) iters := []iter.EntryIterator{} if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { // Make a copy of the request before modifying @@ -171,9 +172,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec } newParams.Start = ingesterQueryInterval.start newParams.End = ingesterQueryInterval.end - level.Debug(spanlogger.FromContext(ctx)).Log( - "msg", "querying ingester", - "params", newParams) + if sp != nil { + sp.LogKV( + "msg", "querying ingester", + "params", newParams) + } ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams) if err != nil { return nil, err @@ -185,9 +188,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { params.Start = storeQueryInterval.start params.End = storeQueryInterval.end - level.Debug(spanlogger.FromContext(ctx)).Log( - "msg", "querying store", - "params", params) + if sp != nil { + sp.LogKV( + "msg", "querying store", + "params", params) + } storeIter, err := q.store.SelectLogs(ctx, params) if err != nil { return nil, err diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 9f946a3247e98..f4b4b18cac753 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -6,7 +6,6 @@ import ( "reflect" "time" - "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" @@ -19,7 +18,6 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) const ( @@ -144,8 +142,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue } sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance") defer sp.Finish() - logger := spanlogger.FromContext(ctx) - level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream") + sp.LogKV("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream") res, err := in.handler.Do(ctx, req) if err != nil { diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 68f71680dd676..695c0d5346fa4 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -277,8 +277,6 @@ func NewQuerySizeLimiterMiddleware( func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryrangebase.Request) (uint64, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "querySizeLimiter.getBytesReadForRequest") defer sp.Finish() - log := spanlogger.FromContextWithFallback(ctx, q.logger) - defer log.Finish() expr, err := syntax.ParseExpr(r.GetQuery()) if err != nil { @@ -300,7 +298,7 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra combinedStats := stats.MergeStats(matcherStats...) - level.Debug(log).Log( + level.Debug(q.logger).Log( append( combinedStats.LoggingKeyValues(), "msg", "queried index", @@ -371,8 +369,6 @@ func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (qu level.Warn(log).Log("msg", "Query exceeds limits", "status", "rejected", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr) return nil, httpgrpc.Errorf(http.StatusBadRequest, q.limitErrorTmpl, statsBytesStr, maxBytesReadStr) } - - level.Debug(log).Log("msg", "Query is within limits", "status", "accepted", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr) } return q.next.Do(ctx, r) diff --git a/pkg/querier/queryrange/queryrangebase/query_range.go b/pkg/querier/queryrange/queryrangebase/query_range.go index 44ac64f021a99..bb85f1a191247 100644 --- a/pkg/querier/queryrange/queryrangebase/query_range.go +++ b/pkg/querier/queryrange/queryrangebase/query_range.go @@ -17,13 +17,13 @@ import ( "github.com/grafana/dskit/httpgrpc" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/timestamp" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) // StatusSuccess Prometheus success result. @@ -208,15 +208,13 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R } sp, ctx := opentracing.StartSpanFromContext(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() buf, err := bodyBuffer(r) if err != nil { log.Error(err) return nil, err } - log.LogFields(otlog.Int("bytes", len(buf))) + sp.LogKV(otlog.Int("bytes", len(buf))) var resp PrometheusResponse if err := json.Unmarshal(buf, &resp); err != nil { diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 9005271cc990b..07da7abfb6518 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -146,9 +146,10 @@ func (ast *astMapperware) checkQuerySizeLimit(ctx context.Context, bytesPerShard } func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - logger := spanlogger.FromContextWithFallback( + logger := util_log.WithContext(ctx, ast.logger) + spLogger := spanlogger.FromContextWithFallback( ctx, - util_log.WithContext(ctx, ast.logger), + logger, ) params, err := ParamsFromRequest(r) @@ -158,14 +159,14 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que maxRVDuration, maxOffset, err := maxRangeVectorAndOffsetDuration(params.GetExpression()) if err != nil { - level.Warn(logger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request") + level.Warn(spLogger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request") return ast.next.Do(ctx, r) } conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset))) // cannot shard with this timerange if err != nil { - level.Warn(logger).Log("err", err.Error(), "msg", "skipped AST mapper for request") + level.Warn(spLogger).Log("err", err.Error(), "msg", "skipped AST mapper for request") return ast.next.Do(ctx, r) } @@ -200,7 +201,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que v := ast.limits.TSDBShardingStrategy(tenants[0]) version, err := logql.ParseShardVersion(v) if err != nil { - level.Warn(logger).Log( + level.Warn(spLogger).Log( "msg", "failed to parse shard version", "fallback", version.String(), "err", err.Error(), @@ -214,7 +215,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que noop, bytesPerShard, parsed, err := mapper.Parse(params.GetExpression()) if err != nil { - level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery()) + level.Warn(spLogger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery()) return nil, err } level.Debug(logger).Log("no-op", noop, "mapped", parsed.String()) diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index ab4d23e830eb7..33438c3717817 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" "github.com/grafana/loki/v3/pkg/storage/types" + util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -141,8 +142,6 @@ func getStatsForMatchers( func (r *dynamicShardResolver) GetStats(e syntax.Expr) (stats.Stats, error) { sp, ctx := opentracing.StartSpanFromContext(r.ctx, "dynamicShardResolver.GetStats") defer sp.Finish() - log := spanlogger.FromContext(r.ctx) - defer log.Finish() start := time.Now() @@ -159,6 +158,7 @@ func (r *dynamicShardResolver) GetStats(e syntax.Expr) (stats.Stats, error) { grps = append(grps, syntax.MatcherRange{}) } + log := util_log.WithContext(ctx, util_log.Logger) results, err := getStatsForMatchers(ctx, log, r.statsHandler, r.from, r.through, grps, r.maxParallelism, r.defaultLookback) if err != nil { return stats.Stats{}, err diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 67ca803d52964..f890242d15d15 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -195,7 +195,9 @@ func StatsCollectorMiddleware() queryrangebase.Middleware { // Re-calculate the summary: the queueTime result is already merged so should not be updated // Log and record metrics for the current query responseStats.ComputeSummary(time.Since(start), 0, totalEntries) - responseStats.Log(level.Debug(logger)) + if logger.Span != nil { + logger.Span.LogKV(responseStats.KVList()...) + } } ctxValue := ctx.Value(ctxKey) if data, ok := ctxValue.(*queryData); ok { diff --git a/pkg/storage/async_store.go b/pkg/storage/async_store.go index ed3c9dab6b422..49fe26612ec69 100644 --- a/pkg/storage/async_store.go +++ b/pkg/storage/async_store.go @@ -25,7 +25,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) type IngesterQuerier interface { @@ -73,8 +72,6 @@ func (a *AsyncStore) GetChunks(ctx context.Context, predicate chunk.Predicate, storeChunksOverride *logproto.ChunkRefGroup, ) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { - spanLogger := spanlogger.FromContext(ctx) - errs := make(chan error) var storeChunks [][]chunk.Chunk @@ -98,7 +95,9 @@ func (a *AsyncStore) GetChunks(ctx context.Context, ingesterChunks, err = a.ingesterQuerier.GetChunkIDs(ctx, from, through, predicate.Matchers...) if err == nil { - level.Debug(spanLogger).Log("ingester-chunks-count", len(ingesterChunks)) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV("ingester-chunks-count", len(ingesterChunks)) + } level.Debug(util_log.Logger).Log("msg", "got chunk ids from ingester", "count", len(ingesterChunks)) } errs <- err diff --git a/pkg/storage/chunk/cache/resultscache/cache.go b/pkg/storage/chunk/cache/resultscache/cache.go index d6e153cf693b0..aaf1d47fa88eb 100644 --- a/pkg/storage/chunk/cache/resultscache/cache.go +++ b/pkg/storage/chunk/cache/resultscache/cache.go @@ -21,8 +21,8 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" + util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/math" - "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -183,8 +183,6 @@ func (s ResultsCache) handleHit(ctx context.Context, r Request, extents []Extent ) sp, ctx := opentracing.StartSpanFromContext(ctx, "handleHit") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() requests, responses, err := s.partition(r, extents) if err != nil { @@ -426,14 +424,11 @@ func (s ResultsCache) get(ctx context.Context, key string) ([]Extent, bool) { var resp CachedResponse sp, ctx := opentracing.StartSpanFromContext(ctx, "unmarshal-extent") //nolint:ineffassign,staticcheck defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() - log.LogFields(otlog.Int("bytes", len(bufs[0]))) + sp.LogFields(otlog.Int("bytes", len(bufs[0]))) if err := proto.Unmarshal(bufs[0], &resp); err != nil { - level.Error(log).Log("msg", "error unmarshalling cached value", "err", err) - log.Error(err) + level.Error(util_log.Logger).Log("msg", "error unmarshalling cached value", "err", err) return nil, false } diff --git a/pkg/storage/chunk/client/util/parallel_chunk_fetch.go b/pkg/storage/chunk/client/util/parallel_chunk_fetch.go index c61fdcf1bd522..40a9af1ddfa94 100644 --- a/pkg/storage/chunk/client/util/parallel_chunk_fetch.go +++ b/pkg/storage/chunk/client/util/parallel_chunk_fetch.go @@ -4,11 +4,12 @@ import ( "context" "sync" + "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/grafana/loki/v3/pkg/storage/chunk" - "github.com/grafana/loki/v3/pkg/util/spanlogger" + util_log "github.com/grafana/loki/v3/pkg/util/log" ) var decodeContextPool = sync.Pool{ @@ -21,9 +22,7 @@ var decodeContextPool = sync.Pool{ func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chunk, f func(context.Context, *chunk.DecodeContext, chunk.Chunk) (chunk.Chunk, error)) ([]chunk.Chunk, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "GetParallelChunks") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Finish() - log.LogFields(otlog.Int("requested", len(chunks))) + sp.LogFields(otlog.Int("requested", len(chunks))) if ctx.Err() != nil { return nil, ctx.Err() @@ -67,9 +66,9 @@ func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chun } } - log.LogFields(otlog.Int("fetched", len(result))) + sp.LogFields(otlog.Int("fetched", len(result))) if lastErr != nil { - log.Error(lastErr) + level.Error(util_log.Logger).Log("msg", "error fetching chunks", "err", lastErr) } // Return any chunks we did receive: a partial result may be useful diff --git a/pkg/storage/stores/composite_store_entry.go b/pkg/storage/stores/composite_store_entry.go index 5b8db237f4c33..2a376e45259de 100644 --- a/pkg/storage/stores/composite_store_entry.go +++ b/pkg/storage/stores/composite_store_entry.go @@ -21,7 +21,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" "github.com/grafana/loki/v3/pkg/util/validation" ) @@ -114,8 +113,6 @@ func (c *storeEntry) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through) if err != nil { @@ -123,7 +120,7 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, } else if shortcut { return nil, nil } - level.Debug(log).Log("metric", metricName) + sp.LogKV("metric", metricName) return c.indexReader.LabelNamesForMetricName(ctx, userID, from, through, metricName, matchers...) } @@ -131,8 +128,6 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through) if err != nil { diff --git a/pkg/storage/stores/series/series_index_store.go b/pkg/storage/stores/series/series_index_store.go index c3ef58266e82f..9fb64fe9b85b8 100644 --- a/pkg/storage/stores/series/series_index_store.go +++ b/pkg/storage/stores/series/series_index_store.go @@ -32,7 +32,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/extract" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) var ( @@ -319,15 +318,13 @@ func (c *IndexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.Ch func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() // Fetch the series IDs from the index seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, matchers) if err != nil { return nil, err } - level.Debug(log).Log("series-ids", len(seriesIDs)) + sp.LogKV("series-ids", len(seriesIDs)) // Lookup the series in the index to get label names. labelNames, err := c.lookupLabelNamesBySeries(ctx, from, through, userID, seriesIDs) @@ -336,10 +333,10 @@ func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID if err == series_index.ErrNotSupported { return c.lookupLabelNamesByChunks(ctx, from, through, userID, seriesIDs) } - level.Error(log).Log("msg", "lookupLabelNamesBySeries", "err", err) + sp.LogKV("msg", "lookupLabelNamesBySeries", "err", err) return nil, err } - level.Debug(log).Log("labelNames", len(labelNames)) + sp.LogKV("labelNames", len(labelNames)) return labelNames, nil } @@ -347,14 +344,12 @@ func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID func (c *IndexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() if len(matchers) != 0 { return c.labelValuesForMetricNameWithMatchers(ctx, userID, from, through, metricName, labelName, matchers...) } - level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "labelName", labelName) + sp.LogKV("from", from, "through", through, "metricName", metricName, "labelName", labelName) queries, err := c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName) if err != nil { @@ -634,10 +629,8 @@ func (c *IndexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesBySeries") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() - level.Debug(log).Log("seriesIDs", len(seriesIDs)) + sp.LogKV("seriesIDs", len(seriesIDs)) queries := make([]series_index.Query, 0, len(seriesIDs)) for _, seriesID := range seriesIDs { qs, err := c.schema.GetLabelNamesForSeries(from, through, userID, []byte(seriesID)) @@ -646,7 +639,7 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, } queries = append(queries, qs...) } - level.Debug(log).Log("queries", len(queries)) + sp.LogKV("queries", len(queries)) entries := entriesPool.Get().(*[]series_index.Entry) defer entriesPool.Put(entries) err := c.lookupEntriesByQueries(ctx, queries, entries) @@ -654,7 +647,7 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, return nil, err } - level.Debug(log).Log("entries", len(*entries)) + sp.LogKV("entries", len(*entries)) var result util.UniqueStrings for _, entry := range *entries { @@ -671,34 +664,32 @@ func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, func (c *IndexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesByChunks") defer sp.Finish() - log := spanlogger.FromContext(ctx) - defer log.Span.Finish() // Lookup the series in the index to get the chunks. chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs) if err != nil { - level.Error(log).Log("msg", "lookupChunksBySeries", "err", err) + sp.LogKV("msg", "lookupChunksBySeries", "err", err) return nil, err } - level.Debug(log).Log("chunk-ids", len(chunkIDs)) + sp.LogKV("chunk-ids", len(chunkIDs)) chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs) if err != nil { - level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err) + sp.LogKV("err", "convertChunkIDsToChunks", "err", err) return nil, err } // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint filtered := filterChunksByTime(from, through, chunks) filtered = filterChunksByUniqueFingerprint(filtered) - level.Debug(log).Log("Chunks post filtering", len(chunks)) + sp.LogKV("Chunks post filtering", len(chunks)) chunksPerQuery.Observe(float64(len(filtered))) // Now fetch the actual chunk data from Memcache / S3 allChunks, err := c.fetcher.FetchChunks(ctx, filtered) if err != nil { - level.Error(log).Log("msg", "FetchChunks", "err", err) + sp.LogKV("msg", "FetchChunks", "err", err) return nil, err } return labelNamesFromChunks(allChunks), nil diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index ce70ce172c02f..f6da2168ae91f 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -23,7 +24,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/v3/pkg/util/encoding" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) const ( @@ -494,15 +494,16 @@ func newCachedListOpObjectClient(oc client.ObjectClient, ttl, interval time.Dura func (c *cachedListOpObjectClient) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { var ( - logger = spanlogger.FromContext(ctx) start = time.Now() cacheDur time.Duration ) defer func() { - logger.LogKV( - "cache_duration", cacheDur, - "total_duration", time.Since(start), - ) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV( + "cache_duration", cacheDur, + "total_duration", time.Since(start), + ) + } }() if delimiter != "" { diff --git a/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go b/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go index c7d909bc09844..2aa3cfda87ea1 100644 --- a/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go +++ b/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go @@ -9,11 +9,11 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/opentracing/opentracing-go" "golang.org/x/sync/singleflight" "github.com/grafana/loki/v3/pkg/storage/chunk/client" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/spanlogger" ) const ( @@ -190,12 +190,13 @@ func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context) (err erro } }() - logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) - level.Info(logger).Log("msg", "building table names cache") - now := time.Now() - defer func() { - level.Info(logger).Log("msg", "table names cache built", "duration", time.Since(now)) - }() + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV("msg", "building table names cache") + now := time.Now() + defer func() { + sp.LogKV("msg", "table names cache built", "duration", time.Since(now)) + }() + } _, tableNames, err := c.ObjectClient.List(ctx, "", delimiter) if err != nil { @@ -276,12 +277,13 @@ func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient } }() - logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) - level.Info(logger).Log("msg", "building table cache") - now := time.Now() - defer func() { - level.Info(logger).Log("msg", "table cache built", "duration", time.Since(now)) - }() + if sp := opentracing.SpanFromContext(ctx); sp != nil { + sp.LogKV("msg", "building table cache") + now := time.Now() + defer func() { + sp.LogKV("msg", "table cache built", "duration", time.Since(now)) + }() + } objects, _, err := objectClient.List(ctx, t.name+delimiter, "") if err != nil {