From 4a965598d64fec25ae15c9aea29d72ca5923b7dc Mon Sep 17 00:00:00 2001 From: Emad Mohamadi Date: Wed, 14 Dec 2022 14:17:46 +0100 Subject: [PATCH] Add carbonserver render tracing --- carbon/app.go | 1 + carbon/config.go | 2 ++ carbonserver/carbonserver.go | 6 ++++ carbonserver/render.go | 62 ++++++++++++++++++++++++++++++++---- 4 files changed, 64 insertions(+), 7 deletions(-) diff --git a/carbon/app.go b/carbon/app.go index 5f8638d13..ed295ebf1 100644 --- a/carbon/app.go +++ b/carbon/app.go @@ -508,6 +508,7 @@ func (app *App) Start(version string) (err error) { carbonserver.SetMaxInflightRequests(conf.Carbonserver.MaxInflightRequests) carbonserver.SetNoServiceWhenIndexIsNotReady(conf.Carbonserver.NoServiceWhenIndexIsNotReady) + carbonserver.SetRenderTraceLoggingEnabled(conf.Carbonserver.RenderTraceLoggingEnabled) if conf.Carbonserver.RequestTimeout != nil { carbonserver.SetRequestTimeout(conf.Carbonserver.RequestTimeout.Value()) diff --git a/carbon/config.go b/carbon/config.go index 4a1490dcc..514f639d3 100644 --- a/carbon/config.go +++ b/carbon/config.go @@ -155,6 +155,8 @@ type carbonserverConfig struct { MaxInflightRequests uint `toml:"max-inflight-requests"` RequestTimeout *Duration `toml:"request-timeout"` } `toml:"api-per-path-rate-limiters"` + + RenderTraceLoggingEnabled bool `toml:"render-trace-logging-enabled"` } type pprofConfig struct { diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index 57f07002d..99e77a722 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -291,6 +291,8 @@ type CarbonserverListener struct { NoServiceWhenIndexIsNotReady bool apiPerPathRatelimiter map[string]*ApiPerPathRatelimiter globQueryRateLimiters []*GlobQueryRateLimiter + + renderTraceLoggingEnabled bool } type prometheus struct { @@ -625,6 +627,10 @@ func (listener *CarbonserverListener) SetAPIPerPathRateLimiter(rls map[string]*A listener.apiPerPathRatelimiter = rls } +func (listener *CarbonserverListener) SetRenderTraceLoggingEnabled(enabled bool) { + listener.renderTraceLoggingEnabled = enabled +} + // skipcq: RVV-B0011 func (listener *CarbonserverListener) CurrentFileIndex() *fileIndex { p := listener.fileIdx.Load() diff --git a/carbonserver/render.go b/carbonserver/render.go index 992f493d4..3a5d60afb 100644 --- a/carbonserver/render.go +++ b/carbonserver/render.go @@ -218,6 +218,13 @@ func (listener *CarbonserverListener) renderHandler(wr http.ResponseWriter, req zap.String("format", format.String()), )) + tle := &traceLogEntries{} + if listener.renderTraceLoggingEnabled { + defer func() { + tle.TotalDuration = float64(time.Since(t0)) / float64(time.Second) + logger.Info("render trace", zap.Any("trace_log_entries", *tle)) + }() + } // Make sure we log which metric caused a panic() defer func() { if r := recover(); r != nil { @@ -236,7 +243,7 @@ func (listener *CarbonserverListener) renderHandler(wr http.ResponseWriter, req } }() - response, fromCache, err := listener.fetchWithCache(ctx, logger, format, targets) + response, fromCache, err := listener.fetchWithCache(ctx, logger, format, targets, tle) wr.Header().Set("Content-Type", response.contentType) if err != nil { @@ -325,7 +332,7 @@ func (listener *CarbonserverListener) getRenderCacheKeyAndSize(targets map[timeR return key, size } -func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target) (fetchResponse, bool, error) { +func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target, tle *traceLogEntries) (fetchResponse, bool, error) { logger = logger.With( zap.String("function", "fetchWithCache"), ) @@ -336,10 +343,12 @@ func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger if listener.queryCacheEnabled { key, size := listener.getRenderCacheKeyAndSize(targets, format.String()) var res interface{} + cacheT0 := time.Now() res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60, func() (interface{}, error) { - return listener.prepareDataProto(ctx, logger, format, targets) + return listener.prepareDataProto(ctx, logger, format, targets, tle) }) + tle.CacheDuration = float64(time.Since(cacheT0)) / float64(time.Second) if err == nil { response = res.(fetchResponse) listener.prometheus.cacheRequest("query", fromCache) @@ -348,9 +357,10 @@ func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger } else { atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1) } + tle.FromCache = fromCache } } else { - response, err = listener.prepareDataProto(ctx, logger, format, targets) + response, err = listener.prepareDataProto(ctx, logger, format, targets, tle) } return response, fromCache, err } @@ -450,7 +460,7 @@ func (listener *CarbonserverListener) prepareDataStream(ctx context.Context, for } } -func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target) (fetchResponse, error) { +func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target, tle *traceLogEntries) (fetchResponse, error) { contentType := "application/text" var b []byte var metricsFetched int @@ -464,12 +474,18 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg responseChan := make(chan response, 1000) metricNames := getUniqueMetricNames(targets) // TODO: pipeline? + expansionT0 := time.Now() expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames) + tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second) if expandedGlobs == nil { return fetchResponse{nil, contentType, 0, 0, 0, nil}, err } metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs) - go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan) + go func() { + prepareT0 := time.Now() + listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan) + tle.PrepareDuration = float64(time.Since(prepareT0)) / float64(time.Second) + }() var metrics []string for renderResponse := range responseChan { @@ -511,6 +527,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg } } + marshalT0 := time.Now() switch format { // We still keep old json format, because it's painful to deal with math.NaN that can occur in new format. case jsonFormat: @@ -562,6 +579,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg default: err = fmt.Errorf("unknown format: %v", format) } + tle.MarshalDuration = float64(time.Since(marshalT0)) / float64(time.Second) if err != nil { return fetchResponse{nil, contentType, 0, 0, 0, nil}, err @@ -658,6 +676,13 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str zap.String("peer", reqPeer), )) + tle := &traceLogEntries{} + if listener.renderTraceLoggingEnabled { + defer func() { + tle.TotalDuration = float64(time.Since(t0)) / float64(time.Second) + logger.Info("render trace", zap.Any("trace_log_entries", *tle)) + }() + } // Make sure we log which metric caused a panic() defer func() { if r := recover(); r != nil { @@ -687,26 +712,36 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) { metricNames := getUniqueMetricNames(targets) // TODO: pipeline? + expansionT0 := time.Now() expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames) + tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second) if expandedGlobs == nil { if err != nil { return nil, status.New(codes.InvalidArgument, err.Error()).Err() } } metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs) - go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan) + go func() { + prepareT0 := time.Now() + listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan) + tle.PrepareDuration = float64(time.Since(prepareT0)) / float64(time.Second) + }() var responses []response + streamT0 := time.Now() responses, metricsFetched, valuesFetched, fetchSize, err = listener.streamMetrics(stream, responseChan, getMetrics) + tle.StreamDuration = float64(time.Since(streamT0)) / float64(time.Second) return responses, err } if listener.streamingQueryCacheEnabled { key, size := listener.getRenderCacheKeyAndSize(targets, format.String()+"grpc") var res interface{} + cacheT0 := time.Now() res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60, func() (interface{}, error) { return fetchAndStreamMetricsFunc(true) }) + tle.CacheDuration = float64(time.Since(cacheT0)) / float64(time.Second) if err == nil { listener.prometheus.cacheRequest("query", fromCache) if fromCache { @@ -718,10 +753,13 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str } close(responseChan) }() + streamT0 := time.Now() _, metricsFetched, valuesFetched, fetchSize, err = listener.streamMetrics(stream, responseChan, false) + tle.StreamDuration = float64(time.Since(streamT0)) / float64(time.Second) } else { atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1) } + tle.FromCache = fromCache } } else { _, err = fetchAndStreamMetricsFunc(false) @@ -769,3 +807,13 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str return nil } + +type traceLogEntries struct { + FromCache bool `json:"from_cache"` + GlobExpansionDuration float64 `json:"glob_expansion_duration"` + CacheDuration float64 `json:"cache_duration"` + PrepareDuration float64 `json:"prepare_duration"` + StreamDuration float64 `json:"stream_duration"` + MarshalDuration float64 `json:"marshal_duration"` + TotalDuration float64 `json:"total_duration"` +}