Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove unnecessary spanlogger usage #13255

Merged
merged 18 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
logutil "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/server"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
"github.com/grafana/loki/v3/pkg/util/validation"
)

Expand Down Expand Up @@ -231,7 +230,6 @@ func (q *query) resultLength(res promql_parser.Value) int {
func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "query.Exec")
defer sp.Finish()
spLogger := spanlogger.FromContext(ctx)

sp.LogKV(
"type", GetRangeType(q.params),
Expand Down Expand Up @@ -265,7 +263,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)

statResult := statsCtx.Result(time.Since(start), queueTime, q.resultLength(data))
statResult.Log(level.Debug(spLogger))
sp.LogKV(statResult.KVList()...)

status, _ := server.ClientHTTPStatusAndError(err)

Expand Down
1 change: 0 additions & 1 deletion pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim
"query", query,
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)

level.Info(logger).Log(logValues...)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func TestLogSlowQuery(t *testing.T) {

func TestLogLabelsQuery(t *testing.T) {
buf := bytes.NewBufferString("")
logger := log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
logger := log.NewLogfmtLogger(buf)
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")
Expand Down
29 changes: 17 additions & 12 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/dustin/go-humanize"

"github.com/go-kit/log"
)

Expand Down Expand Up @@ -518,9 +519,12 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache {
return stats
}

// Log logs a query statistics result.
func (r Result) Log(log log.Logger) {
_ = log.Log(
func (r Result) Log(logger log.Logger) {
logger.Log(r.KVList()...)
}

func (r Result) KVList() []any {
result := []any{
"Ingester.TotalReached", r.Ingester.TotalReached,
"Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched,
"Ingester.TotalBatches", r.Ingester.TotalBatches,
Expand Down Expand Up @@ -549,25 +553,26 @@ func (r Result) Log(log log.Logger) {
"Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)),
"Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates,
"Querier.QueryReferencedStructuredMetadata", r.Querier.Store.QueryReferencedStructured,
)
r.Caches.Log(log)
r.Summary.Log(log)
}

result = append(result, r.Caches.kvList()...)
return append(result, r.Summary.kvList()...)
}

func (s Summary) Log(log log.Logger) {
_ = log.Log(
func (s Summary) kvList() []any {
return []any{
"Summary.BytesProcessedPerSecond", humanize.Bytes(uint64(s.BytesProcessedPerSecond)),
"Summary.LinesProcessedPerSecond", s.LinesProcessedPerSecond,
"Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)),
"Summary.TotalLinesProcessed", s.TotalLinesProcessed,
"Summary.PostFilterLines", s.TotalPostFilterLines,
"Summary.ExecTime", ConvertSecondsToNanoseconds(s.ExecTime),
"Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime),
)
}
}

func (c Caches) Log(log log.Logger) {
_ = log.Log(
func (c Caches) kvList() []any {
return []any{
"Cache.Chunk.Requests", c.Chunk.Requests,
"Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested,
"Cache.Chunk.EntriesFound", c.Chunk.EntriesFound,
Expand Down Expand Up @@ -620,5 +625,5 @@ func (c Caches) Log(log log.Logger) {
"Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)),
"Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)),
"Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(),
)
}
}
35 changes: 20 additions & 15 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,16 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques
resLength = len(resp.Values)
}
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)
logql.RecordLabelQueryMetrics(ctx, util_log.Logger, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)

return resp, err
}
Expand Down Expand Up @@ -266,15 +267,16 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ
}

statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult)
logql.RecordSeriesQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult)

return resp, statResult, err
}
Expand All @@ -296,15 +298,16 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordStatsQueryMetrics(ctx, log, req.Start, req.End, req.Query, strconv.Itoa(status), statResult)
logql.RecordStatsQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Query, strconv.Itoa(status), statResult)

return resp, err
}
Expand All @@ -327,16 +330,17 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ

statResult := statsCtx.Result(time.Since(start), queueTime, resLength)

log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordShardsQueryMetrics(
ctx, log, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult,
ctx, util_log.Logger, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult,
)

return resp, err
Expand All @@ -363,15 +367,16 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV(statResult.KVList()...)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult)
logql.RecordVolumeQueryMetrics(ctx, util_log.Logger, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult)

return resp, nil
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec

ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

sp := opentracing.SpanFromContext(ctx)
iters := []iter.EntryIterator{}
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
// Make a copy of the request before modifying
Expand All @@ -171,9 +172,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
}
newParams.Start = ingesterQueryInterval.start
newParams.End = ingesterQueryInterval.end
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "querying ingester",
"params", newParams)
if sp != nil {
sp.LogKV(
"msg", "querying ingester",
"params", newParams)
}
ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams)
if err != nil {
return nil, err
Expand All @@ -185,9 +188,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
params.Start = storeQueryInterval.start
params.End = storeQueryInterval.end
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "querying store",
"params", params)
if sp != nil {
sp.LogKV(
"msg", "querying store",
"params", params)
}
storeIter, err := q.store.SelectLogs(ctx, params)
if err != nil {
return nil, err
Expand Down
5 changes: 1 addition & 4 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"reflect"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

const (
Expand Down Expand Up @@ -144,8 +142,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance")
defer sp.Finish()
logger := spanlogger.FromContext(ctx)
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")
sp.LogKV("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")

res, err := in.handler.Do(ctx, req)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,6 @@ func NewQuerySizeLimiterMiddleware(
func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryrangebase.Request) (uint64, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "querySizeLimiter.getBytesReadForRequest")
defer sp.Finish()
log := spanlogger.FromContextWithFallback(ctx, q.logger)
defer log.Finish()

expr, err := syntax.ParseExpr(r.GetQuery())
if err != nil {
Expand All @@ -300,7 +298,7 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra

combinedStats := stats.MergeStats(matcherStats...)

level.Debug(log).Log(
level.Debug(q.logger).Log(
append(
combinedStats.LoggingKeyValues(),
"msg", "queried index",
Expand Down Expand Up @@ -371,8 +369,6 @@ func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (qu
level.Warn(log).Log("msg", "Query exceeds limits", "status", "rejected", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr)
return nil, httpgrpc.Errorf(http.StatusBadRequest, q.limitErrorTmpl, statsBytesStr, maxBytesReadStr)
}

level.Debug(log).Log("msg", "Query is within limits", "status", "accepted", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr)
}

return q.next.Do(ctx, r)
Expand Down
6 changes: 2 additions & 4 deletions pkg/querier/queryrange/queryrangebase/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"github.com/grafana/dskit/httpgrpc"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

// StatusSuccess Prometheus success result.
Expand Down Expand Up @@ -208,15 +208,13 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Finish()

buf, err := bodyBuffer(r)
if err != nil {
log.Error(err)
return nil, err
}
log.LogFields(otlog.Int("bytes", len(buf)))
sp.LogKV(otlog.Int("bytes", len(buf)))

var resp PrometheusResponse
if err := json.Unmarshal(buf, &resp); err != nil {
Expand Down
13 changes: 7 additions & 6 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ func (ast *astMapperware) checkQuerySizeLimit(ctx context.Context, bytesPerShard
}

func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
logger := spanlogger.FromContextWithFallback(
logger := util_log.WithContext(ctx, ast.logger)
spLogger := spanlogger.FromContextWithFallback(
ctx,
util_log.WithContext(ctx, ast.logger),
logger,
)

params, err := ParamsFromRequest(r)
Expand All @@ -158,14 +159,14 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que

maxRVDuration, maxOffset, err := maxRangeVectorAndOffsetDuration(params.GetExpression())
if err != nil {
level.Warn(logger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request")
level.Warn(spLogger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request")
return ast.next.Do(ctx, r)
}

conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset)))
// cannot shard with this timerange
if err != nil {
level.Warn(logger).Log("err", err.Error(), "msg", "skipped AST mapper for request")
level.Warn(spLogger).Log("err", err.Error(), "msg", "skipped AST mapper for request")
return ast.next.Do(ctx, r)
}

Expand Down Expand Up @@ -200,7 +201,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
v := ast.limits.TSDBShardingStrategy(tenants[0])
version, err := logql.ParseShardVersion(v)
if err != nil {
level.Warn(logger).Log(
level.Warn(spLogger).Log(
"msg", "failed to parse shard version",
"fallback", version.String(),
"err", err.Error(),
Expand All @@ -214,7 +215,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que

noop, bytesPerShard, parsed, err := mapper.Parse(params.GetExpression())
if err != nil {
level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
level.Warn(spLogger).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
return nil, err
}
level.Debug(logger).Log("no-op", noop, "mapped", parsed.String())
Expand Down
Loading
Loading