Skip to content

Commit

Permalink
Add per-user query metrics for series and bytes returned (cortexproje…
Browse files Browse the repository at this point in the history
…ct#4343)

* Add per-user query metrics for series and bytes returned

Add stats included in query responses from the querier and distributor
for measuring the number of series and bytes included in successful
queries. These stats are emitted per-user as summaries from the query
frontends.

These stats are picked to add visibility into the same resources limited
as part of cortexproject#4179 and cortexproject#4216.

Fixes cortexproject#4259

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

* Formatting fix

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

* Fix changelog to match actual changes

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

* Typo

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

* Code review changes, rename things for clarity

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

* Apply suggestions from code review

Co-authored-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

* Code review changes, remove superfluous summaries

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

Co-authored-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Alvin Lin <alvinlin@amazon.com>
  • Loading branch information
2 people authored and alvinlin123 committed Jan 14, 2022
1 parent 11a31e1 commit 142b7a2
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317
* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4357
* [FEATURE] Query Frontend: Add `cortex_query_fetched_series_total` and `cortex_query_fetched_chunks_bytes_total` per-user counters to expose the number of series and bytes fetched as part of queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343
* [CHANGE] Update Go version to 1.16.6. #4362
* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345
Expand Down
5 changes: 5 additions & 0 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/cortexproject/cortex/pkg/cortexpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -282,6 +283,7 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
var (
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
reqStats = stats.FromContext(ctx)
)

// Fetch samples from multiple ingesters
Expand Down Expand Up @@ -383,6 +385,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
resp.Timeseries = append(resp.Timeseries, series)
}

reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries)))
reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize()))

return resp, nil
}

Expand Down
25 changes: 23 additions & 2 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Handler struct {

// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryBytes *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}

Expand All @@ -77,8 +79,20 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Help: "Total amount of wall clock time spend processing queries.",
}, []string{"user"})

h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_series_total",
Help: "Number of series fetched to execute a query.",
}, []string{"user"})

h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
}, []string{"user"})

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.queryBytes.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
Expand Down Expand Up @@ -165,9 +179,14 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
return
}
userID := tenant.JoinTenantIDs(tenantIDs)
wallTime := stats.LoadWallTime()
numSeries := stats.LoadFetchedSeries()
numBytes := stats.LoadFetchedChunkBytes()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(stats.LoadWallTime().Seconds())
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

// Log stats.
Expand All @@ -177,7 +196,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"method", r.Method,
"path", r.URL.Path,
"response_time", queryResponseTime,
"query_wall_time_seconds", stats.LoadWallTime().Seconds(),
"query_wall_time_seconds", wallTime.Seconds(),
"fetched_series_count", numSeries,
"fetched_chunks_bytes", numBytes,
}, formatQueryString(queryString)...)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
Expand Down
63 changes: 63 additions & 0 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@ package transport

import (
"context"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
)

type roundTripperFunc func(*http.Request) (*http.Response, error)

func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}

func TestWriteError(t *testing.T) {
for _, test := range []struct {
status int
Expand All @@ -28,3 +41,53 @@ func TestWriteError(t *testing.T) {
})
}
}

func TestHandler_ServeHTTP(t *testing.T) {
for _, tt := range []struct {
name string
cfg HandlerConfig
expectedMetrics int
}{
{
name: "test handler with stats enabled",
cfg: HandlerConfig{QueryStatsEnabled: true},
expectedMetrics: 3,
},
{
name: "test handler with stats disabled",
cfg: HandlerConfig{QueryStatsEnabled: false},
expectedMetrics: 0,
},
} {
t.Run(tt.name, func(t *testing.T) {
roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("{}")),
}, nil
})

reg := prometheus.NewPedanticRegistry()
handler := NewHandler(tt.cfg, roundTripper, log.NewNopLogger(), reg)

ctx := user.InjectOrgID(context.Background(), "12345")
req := httptest.NewRequest("GET", "/", nil)
req = req.WithContext(ctx)
resp := httptest.NewRecorder()

handler.ServeHTTP(resp, req)
_, _ = io.ReadAll(resp.Body)
require.Equal(t, resp.Code, http.StatusOK)

count, err := promtest.GatherAndCount(
reg,
"cortex_query_seconds_total",
"cortex_query_fetched_series_total",
"cortex_query_fetched_chunks_bytes_total",
)

assert.NoError(t, err)
assert.Equal(t, tt.expectedMetrics, count)
})
}
}
24 changes: 14 additions & 10 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/series"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/storage/bucket"
Expand Down Expand Up @@ -565,6 +566,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
numChunks = atomic.NewInt32(0)
spanLog = spanlogger.FromContext(ctx)
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
reqStats = stats.FromContext(ctx)
)

// Concurrently fetch series from all clients.
Expand Down Expand Up @@ -626,10 +628,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
}
}
chunksSize := 0
for _, c := range s.Chunks {
chunksSize += c.Size()
}
chunksSize := countChunkBytes(s)
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
return validation.LimitError(chunkBytesLimitErr.Error())
}
Expand Down Expand Up @@ -657,10 +656,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
}
}

numSeries := len(mySeries)
chunkBytes := countChunkBytes(mySeries...)

reqStats.AddFetchedSeries(uint64(numSeries))
reqStats.AddFetchedChunkBytes(uint64(chunkBytes))

level.Debug(spanLog).Log("msg", "received series from store-gateway",
"instance", c.RemoteAddress(),
"num series", len(mySeries),
"bytes series", countSeriesBytes(mySeries),
"fetched series", numSeries,
"fetched chunk bytes", chunkBytes,
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))

Expand Down Expand Up @@ -944,12 +949,11 @@ func convertBlockHintsToULIDs(hints []hintspb.Block) ([]ulid.ULID, error) {
return res, nil
}

func countSeriesBytes(series []*storepb.Series) (count uint64) {
// countChunkBytes returns the size of the chunks making up the provided series in bytes
func countChunkBytes(series ...*storepb.Series) (count int) {
for _, s := range series {
for _, c := range s.Chunks {
if c.Raw != nil {
count += uint64(len(c.Raw.Data))
}
count += c.Size()
}
}

Expand Down
34 changes: 34 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,47 @@ func (s *Stats) LoadWallTime() time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(&s.WallTime)))
}

func (s *Stats) AddFetchedSeries(series uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.FetchedSeriesCount, series)
}

func (s *Stats) LoadFetchedSeries() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.FetchedSeriesCount)
}

func (s *Stats) AddFetchedChunkBytes(bytes uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.FetchedChunkBytes, bytes)
}

func (s *Stats) LoadFetchedChunkBytes() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.FetchedChunkBytes)
}

// Merge the provide Stats into this one.
func (s *Stats) Merge(other *Stats) {
if s == nil || other == nil {
return
}

s.AddWallTime(other.LoadWallTime())
s.AddFetchedSeries(other.LoadFetchedSeries())
s.AddFetchedChunkBytes(other.LoadFetchedChunkBytes())
}

func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool {
Expand Down
Loading

0 comments on commit 142b7a2

Please sign in to comment.