Skip to content

Commit

Permalink
refactor: Remove unnecessary spanlogger usage (#13255)
Browse files Browse the repository at this point in the history
Historically, we've been using `spanlogger` to enrich our spans. That isn't the right usage for it (as of now), as they'll be emitting log lines. Our current usage is causing:
- Noisy/important log lines less visible
- Unnecessary logging volume
- Spread of the bad practice.
With this PR I'm moving away from `spanlogger` all places where the log message is too bad or nonexistent. That's because these cases are indicating we don't care about that log line at all, and only about the data injected in the span.
  • Loading branch information
DylanGuedes authored Jun 21, 2024
1 parent f06eabb commit c1fada9
Show file tree
Hide file tree
Showing 19 changed files with 112 additions and 124 deletions.
4 changes: 1 addition & 3 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
logutil "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/server"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
"github.com/grafana/loki/v3/pkg/util/validation"
)

Expand Down Expand Up @@ -231,7 +230,6 @@ func (q *query) resultLength(res promql_parser.Value) int {
func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "query.Exec")
defer sp.Finish()
spLogger := spanlogger.FromContext(ctx)

sp.LogKV(
"type", GetRangeType(q.params),
Expand Down Expand Up @@ -265,7 +263,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)

statResult := statsCtx.Result(time.Since(start), queueTime, q.resultLength(data))
statResult.Log(level.Debug(spLogger))
sp.LogKV(statResult.KVList()...)

status, _ := server.ClientHTTPStatusAndError(err)

Expand Down
1 change: 0 additions & 1 deletion pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim
"query", query,
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)

level.Info(logger).Log(logValues...)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func TestLogSlowQuery(t *testing.T) {

func TestLogLabelsQuery(t *testing.T) {
buf := bytes.NewBufferString("")
logger := log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
logger := log.NewLogfmtLogger(buf)
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")
Expand Down
29 changes: 17 additions & 12 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/dustin/go-humanize"

"github.com/go-kit/log"
)

Expand Down Expand Up @@ -518,9 +519,12 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache {
return stats
}

// Log logs a query statistics result.
func (r Result) Log(log log.Logger) {
_ = log.Log(
func (r Result) Log(logger log.Logger) {
logger.Log(r.KVList()...)
}

func (r Result) KVList() []any {
result := []any{
"Ingester.TotalReached", r.Ingester.TotalReached,
"Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched,
"Ingester.TotalBatches", r.Ingester.TotalBatches,
Expand Down Expand Up @@ -549,25 +553,26 @@ func (r Result) Log(log log.Logger) {
"Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)),
"Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates,
"Querier.QueryReferencedStructuredMetadata", r.Querier.Store.QueryReferencedStructured,
)
r.Caches.Log(log)
r.Summary.Log(log)
}

result = append(result, r.Caches.kvList()...)
return append(result, r.Summary.kvList()...)
}

func (s Summary) Log(log log.Logger) {
_ = log.Log(
func (s Summary) kvList() []any {
return []any{
"Summary.BytesProcessedPerSecond", humanize.Bytes(uint64(s.BytesProcessedPerSecond)),
"Summary.LinesProcessedPerSecond", s.LinesProcessedPerSecond,
"Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)),
"Summary.TotalLinesProcessed", s.TotalLinesProcessed,
"Summary.PostFilterLines", s.TotalPostFilterLines,
"Summary.ExecTime", ConvertSecondsToNanoseconds(s.ExecTime),
"Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime),
)
}
}

func (c Caches) Log(log log.Logger) {
_ = log.Log(
func (c Caches) kvList() []any {
return []any{
"Cache.Chunk.Requests", c.Chunk.Requests,
"Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested,
"Cache.Chunk.EntriesFound", c.Chunk.EntriesFound,
Expand Down Expand Up @@ -620,5 +625,5 @@ func (c Caches) Log(log log.Logger) {
"Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)),
"Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)),
"Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(),
)
}
}
35 changes: 20 additions & 15 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,16 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques
resLength = len(resp.Values)
}
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)
logql.RecordLabelQueryMetrics(ctx, util_log.Logger, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)

return resp, err
}
Expand Down Expand Up @@ -266,15 +267,16 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ
}

statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult)
logql.RecordSeriesQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult)

return resp, statResult, err
}
Expand All @@ -296,15 +298,16 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordStatsQueryMetrics(ctx, log, req.Start, req.End, req.Query, strconv.Itoa(status), statResult)
logql.RecordStatsQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Query, strconv.Itoa(status), statResult)

return resp, err
}
Expand All @@ -327,16 +330,17 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ

statResult := statsCtx.Result(time.Since(start), queueTime, resLength)

log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordShardsQueryMetrics(
ctx, log, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult,
ctx, util_log.Logger, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult,
)

return resp, err
Expand All @@ -363,15 +367,16 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult)
logql.RecordVolumeQueryMetrics(ctx, util_log.Logger, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult)

return resp, nil
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec

ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

sp := opentracing.SpanFromContext(ctx)
iters := []iter.EntryIterator{}
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
// Make a copy of the request before modifying
Expand All @@ -171,9 +172,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
}
newParams.Start = ingesterQueryInterval.start
newParams.End = ingesterQueryInterval.end
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "querying ingester",
"params", newParams)
if sp != nil {
sp.LogKV(
"msg", "querying ingester",
"params", newParams)
}
ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams)
if err != nil {
return nil, err
Expand All @@ -185,9 +188,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
params.Start = storeQueryInterval.start
params.End = storeQueryInterval.end
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "querying store",
"params", params)
if sp != nil {
sp.LogKV(
"msg", "querying store",
"params", params)
}
storeIter, err := q.store.SelectLogs(ctx, params)
if err != nil {
return nil, err
Expand Down
5 changes: 1 addition & 4 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"reflect"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

const (
Expand Down Expand Up @@ -144,8 +142,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance")
defer sp.Finish()
logger := spanlogger.FromContext(ctx)
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")
sp.LogKV("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")

res, err := in.handler.Do(ctx, req)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,6 @@ func NewQuerySizeLimiterMiddleware(
func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryrangebase.Request) (uint64, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "querySizeLimiter.getBytesReadForRequest")
defer sp.Finish()
log := spanlogger.FromContextWithFallback(ctx, q.logger)
defer log.Finish()

expr, err := syntax.ParseExpr(r.GetQuery())
if err != nil {
Expand All @@ -300,7 +298,7 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra

combinedStats := stats.MergeStats(matcherStats...)

level.Debug(log).Log(
level.Debug(q.logger).Log(
append(
combinedStats.LoggingKeyValues(),
"msg", "queried index",
Expand Down Expand Up @@ -371,8 +369,6 @@ func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (qu
level.Warn(log).Log("msg", "Query exceeds limits", "status", "rejected", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr)
return nil, httpgrpc.Errorf(http.StatusBadRequest, q.limitErrorTmpl, statsBytesStr, maxBytesReadStr)
}

level.Debug(log).Log("msg", "Query is within limits", "status", "accepted", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr)
}

return q.next.Do(ctx, r)
Expand Down
6 changes: 2 additions & 4 deletions pkg/querier/queryrange/queryrangebase/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"github.com/grafana/dskit/httpgrpc"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

// StatusSuccess Prometheus success result.
Expand Down Expand Up @@ -208,15 +208,13 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Finish()

buf, err := bodyBuffer(r)
if err != nil {
log.Error(err)
return nil, err
}
log.LogFields(otlog.Int("bytes", len(buf)))
sp.LogKV(otlog.Int("bytes", len(buf)))

var resp PrometheusResponse
if err := json.Unmarshal(buf, &resp); err != nil {
Expand Down
13 changes: 7 additions & 6 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ func (ast *astMapperware) checkQuerySizeLimit(ctx context.Context, bytesPerShard
}

func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
logger := spanlogger.FromContextWithFallback(
logger := util_log.WithContext(ctx, ast.logger)
spLogger := spanlogger.FromContextWithFallback(
ctx,
util_log.WithContext(ctx, ast.logger),
logger,
)

params, err := ParamsFromRequest(r)
Expand All @@ -158,14 +159,14 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que

maxRVDuration, maxOffset, err := maxRangeVectorAndOffsetDuration(params.GetExpression())
if err != nil {
level.Warn(logger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request")
level.Warn(spLogger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request")
return ast.next.Do(ctx, r)
}

conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset)))
// cannot shard with this timerange
if err != nil {
level.Warn(logger).Log("err", err.Error(), "msg", "skipped AST mapper for request")
level.Warn(spLogger).Log("err", err.Error(), "msg", "skipped AST mapper for request")
return ast.next.Do(ctx, r)
}

Expand Down Expand Up @@ -200,7 +201,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
v := ast.limits.TSDBShardingStrategy(tenants[0])
version, err := logql.ParseShardVersion(v)
if err != nil {
level.Warn(logger).Log(
level.Warn(spLogger).Log(
"msg", "failed to parse shard version",
"fallback", version.String(),
"err", err.Error(),
Expand All @@ -214,7 +215,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que

noop, bytesPerShard, parsed, err := mapper.Parse(params.GetExpression())
if err != nil {
level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
level.Warn(spLogger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
return nil, err
}
level.Debug(logger).Log("no-op", noop, "mapped", parsed.String())
Expand Down
Loading

0 comments on commit c1fada9

Please sign in to comment.