Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove unnecessary spanlogger usage #13255

Merged
merged 18 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/logcli/print/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (k kvLogger) Log(keyvals ...interface{}) error {

func (r *QueryResultPrinter) PrintStats(stats stats.Result) {
writer := tabwriter.NewWriter(os.Stderr, 0, 8, 0, '\t', 0)
stats.Log(kvLogger{Writer: writer})
stats.LogWithLogger(kvLogger{Writer: writer})
}

func matchLabels(on bool, l loghttp.LabelSet, names []string) loghttp.LabelSet {
Expand Down
4 changes: 1 addition & 3 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
logutil "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/server"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
"github.com/grafana/loki/v3/pkg/util/validation"
)

Expand Down Expand Up @@ -231,7 +230,6 @@ func (q *query) resultLength(res promql_parser.Value) int {
func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "query.Exec")
defer sp.Finish()
spLogger := spanlogger.FromContext(ctx)

sp.LogKV(
"type", GetRangeType(q.params),
Expand Down Expand Up @@ -265,7 +263,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)

statResult := statsCtx.Result(time.Since(start), queueTime, q.resultLength(data))
statResult.Log(level.Debug(spLogger))
statResult.LogWithSpan(sp)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about a little bit different approach where we leave the original stats.Log() method and use that to print the stats in metrics.go.

But here do: sp.LogKV(statResult.KVList()...)

Copy link
Contributor Author

@DylanGuedes DylanGuedes Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like your style more but fyi we use LogWithSpan at other places. I'll try using LogKV there too then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed 0c9570a addressing it


status, _ := server.ClientHTTPStatusAndError(err)

Expand Down
1 change: 0 additions & 1 deletion pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim
"query", query,
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)

level.Info(logger).Log(logValues...)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func TestLogSlowQuery(t *testing.T) {

func TestLogLabelsQuery(t *testing.T) {
buf := bytes.NewBufferString("")
logger := log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
logger := log.NewLogfmtLogger(buf)
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")
Expand Down
53 changes: 41 additions & 12 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"time"

"github.com/dustin/go-humanize"
"github.com/opentracing/opentracing-go"

"github.com/go-kit/log"
)

Expand Down Expand Up @@ -518,9 +520,22 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache {
return stats
}

// Log logs a query statistics result.
func (r Result) Log(log log.Logger) {
_ = log.Log(
func (r Result) LogWithSpan(sp opentracing.Span) {
sp.LogKV(r.KVList()...)

r.Caches.LogWithSpan(sp)
r.Summary.LogWithSpan(sp)
}

func (r Result) LogWithLogger(logger log.Logger) {
logger.Log(r.KVList()...)

r.Caches.LogWithLogger(logger)
r.Summary.LogWithLogger(logger)
}

func (r Result) KVList() []any {
return []any{
"Ingester.TotalReached", r.Ingester.TotalReached,
"Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched,
"Ingester.TotalBatches", r.Ingester.TotalBatches,
Expand Down Expand Up @@ -549,25 +564,35 @@ func (r Result) Log(log log.Logger) {
"Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)),
"Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates,
"Querier.QueryReferencedStructuredMetadata", r.Querier.Store.QueryReferencedStructured,
)
r.Caches.Log(log)
r.Summary.Log(log)
}
}

func (s Summary) LogWithLogger(logger log.Logger) {
logger.Log(s.KVList()...)
}

func (s Summary) Log(log log.Logger) {
_ = log.Log(
func (s Summary) KVList() []any {
return []any{
"Summary.BytesProcessedPerSecond", humanize.Bytes(uint64(s.BytesProcessedPerSecond)),
"Summary.LinesProcessedPerSecond", s.LinesProcessedPerSecond,
"Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)),
"Summary.TotalLinesProcessed", s.TotalLinesProcessed,
"Summary.PostFilterLines", s.TotalPostFilterLines,
"Summary.ExecTime", ConvertSecondsToNanoseconds(s.ExecTime),
"Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime),
)
}
}

func (c Caches) Log(log log.Logger) {
_ = log.Log(
func (s Summary) LogWithSpan(sp opentracing.Span) {
sp.LogKV(s.KVList()...)
}

func (c Caches) LogWithLogger(logger log.Logger) {
logger.Log(c.KVList()...)
}

func (c Caches) KVList() []any {
return []any{
"Cache.Chunk.Requests", c.Chunk.Requests,
"Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested,
"Cache.Chunk.EntriesFound", c.Chunk.EntriesFound,
Expand Down Expand Up @@ -620,5 +645,9 @@ func (c Caches) Log(log log.Logger) {
"Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)),
"Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)),
"Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(),
)
}
}

func (c Caches) LogWithSpan(sp opentracing.Span) {
sp.LogKV(c.KVList()...)
}
2 changes: 1 addition & 1 deletion pkg/logqlmodel/stats/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestResult(t *testing.T) {
fakeIngesterQuery(ctx)

res := stats.Result(2*time.Second, 2*time.Nanosecond, 10)
res.Log(util_log.Logger)
res.LogWithLogger(util_log.Logger)
expected := Result{
Ingester: Ingester{
TotalChunksMatched: 200,
Expand Down
35 changes: 20 additions & 15 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,16 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques
resLength = len(resp.Values)
}
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
statResult.LogWithSpan(sp)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)
logql.RecordLabelQueryMetrics(ctx, util_log.Logger, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult)

return resp, err
}
Expand Down Expand Up @@ -266,15 +267,16 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ
}

statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
statResult.LogWithSpan(sp)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult)
logql.RecordSeriesQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult)

return resp, statResult, err
}
Expand All @@ -296,15 +298,16 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
statResult.LogWithSpan(sp)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordStatsQueryMetrics(ctx, log, req.Start, req.End, req.Query, strconv.Itoa(status), statResult)
logql.RecordStatsQueryMetrics(ctx, util_log.Logger, req.Start, req.End, req.Query, strconv.Itoa(status), statResult)

return resp, err
}
Expand All @@ -327,16 +330,17 @@ func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQ

statResult := statsCtx.Result(time.Since(start), queueTime, resLength)

log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
statResult.LogWithSpan(sp)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordShardsQueryMetrics(
ctx, log, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult,
ctx, util_log.Logger, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult,
)

return resp, err
Expand All @@ -363,15 +367,16 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
if sp := opentracing.SpanFromContext(ctx); sp != nil {
statResult.LogWithSpan(sp)
}

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult)
logql.RecordVolumeQueryMetrics(ctx, util_log.Logger, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult)

return resp, nil
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec

ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

sp := opentracing.SpanFromContext(ctx)
iters := []iter.EntryIterator{}
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
// Make a copy of the request before modifying
Expand All @@ -171,9 +172,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
}
newParams.Start = ingesterQueryInterval.start
newParams.End = ingesterQueryInterval.end
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "querying ingester",
"params", newParams)
if sp != nil {
sp.LogKV(
"msg", "querying ingester",
"params", newParams)
}
ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams)
if err != nil {
return nil, err
Expand All @@ -185,9 +188,11 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
params.Start = storeQueryInterval.start
params.End = storeQueryInterval.end
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "querying store",
"params", params)
if sp != nil {
sp.LogKV(
"msg", "querying store",
"params", params)
}
storeIter, err := q.store.SelectLogs(ctx, params)
if err != nil {
return nil, err
Expand Down
5 changes: 1 addition & 4 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"reflect"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

const (
Expand Down Expand Up @@ -144,8 +142,7 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance")
defer sp.Finish()
logger := spanlogger.FromContext(ctx)
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")
sp.LogKV("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")

res, err := in.handler.Do(ctx, req)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,6 @@ func NewQuerySizeLimiterMiddleware(
func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryrangebase.Request) (uint64, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "querySizeLimiter.getBytesReadForRequest")
defer sp.Finish()
log := spanlogger.FromContextWithFallback(ctx, q.logger)
defer log.Finish()

expr, err := syntax.ParseExpr(r.GetQuery())
if err != nil {
Expand All @@ -300,7 +298,7 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra

combinedStats := stats.MergeStats(matcherStats...)

level.Debug(log).Log(
level.Debug(q.logger).Log(
append(
combinedStats.LoggingKeyValues(),
"msg", "queried index",
Expand Down Expand Up @@ -371,8 +369,6 @@ func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (qu
level.Warn(log).Log("msg", "Query exceeds limits", "status", "rejected", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr)
return nil, httpgrpc.Errorf(http.StatusBadRequest, q.limitErrorTmpl, statsBytesStr, maxBytesReadStr)
}

level.Debug(log).Log("msg", "Query is within limits", "status", "accepted", "limit_name", q.guessLimitName(), "limit_bytes", maxBytesReadStr, "resolved_bytes", statsBytesStr)
}

return q.next.Do(ctx, r)
Expand Down
6 changes: 2 additions & 4 deletions pkg/querier/queryrange/queryrangebase/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"github.com/grafana/dskit/httpgrpc"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

// StatusSuccess Prometheus success result.
Expand Down Expand Up @@ -208,15 +208,13 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Finish()

buf, err := bodyBuffer(r)
if err != nil {
log.Error(err)
return nil, err
}
log.LogFields(otlog.Int("bytes", len(buf)))
sp.LogKV(otlog.Int("bytes", len(buf)))

var resp PrometheusResponse
if err := json.Unmarshal(buf, &resp); err != nil {
Expand Down
Loading
Loading