From 142b7a26e0b924073533a7c054fd0558d426b538 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri <56quarters@users.noreply.github.com> Date: Tue, 20 Jul 2021 10:34:29 -0400 Subject: [PATCH] Add per-user query metrics for series and bytes returned (#4343) * Add per-user query metrics for series and bytes returned Add stats included in query responses from the querier and distributor for measuring the number of series and bytes included in successful queries. These stats are emitted per-user as summaries from the query frontends. These stats are picked to add visibility into the same resources limited as part of #4179 and #4216. Fixes #4259 Signed-off-by: Nick Pillitteri * Formatting fix Signed-off-by: Nick Pillitteri * Fix changelog to match actual changes Signed-off-by: Nick Pillitteri * Typo Signed-off-by: Nick Pillitteri * Code review changes, rename things for clarity Signed-off-by: Nick Pillitteri * Apply suggestions from code review Co-authored-by: Marco Pracucci Signed-off-by: Nick Pillitteri * Code review changes, remove superfluous summaries Signed-off-by: Nick Pillitteri Co-authored-by: Marco Pracucci Signed-off-by: Alvin Lin --- CHANGELOG.md | 1 + pkg/distributor/query.go | 5 ++ pkg/frontend/transport/handler.go | 25 +++++- pkg/frontend/transport/handler_test.go | 63 +++++++++++++ pkg/querier/blocks_store_queryable.go | 24 ++--- pkg/querier/stats/stats.go | 34 +++++++ pkg/querier/stats/stats.pb.go | 118 +++++++++++++++++++++---- pkg/querier/stats/stats.proto | 4 + pkg/querier/stats/stats_test.go | 91 +++++++++++++++++++ 9 files changed, 337 insertions(+), 28 deletions(-) create mode 100644 pkg/querier/stats/stats_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c93e39c0a7..5c5fdea53f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317 * [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4357 +* [FEATURE] Query Frontend: Add `cortex_query_fetched_series_total` and `cortex_query_fetched_chunks_bytes_total` per-user counters to expose the number of series and bytes fetched as part of queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343 * [CHANGE] Update Go version to 1.16.6. #4362 * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 * [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345 diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 8619a07125..87b45164dc 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -13,6 +13,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" ingester_client "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" @@ -282,6 +283,7 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { var ( queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) + reqStats = stats.FromContext(ctx) ) // Fetch samples from multiple ingesters @@ -383,6 +385,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri resp.Timeseries = append(resp.Timeseries, series) } + reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries))) + reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize())) + return resp, nil } diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 435a022748..af87e4fe30 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -60,6 +60,8 @@ type Handler struct { // Metrics. querySeconds *prometheus.CounterVec + querySeries *prometheus.CounterVec + queryBytes *prometheus.CounterVec activeUsers *util.ActiveUsersCleanupService } @@ -77,8 +79,20 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Total amount of wall clock time spend processing queries.", }, []string{"user"}) + h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_fetched_series_total", + Help: "Number of series fetched to execute a query.", + }, []string{"user"}) + + h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_fetched_chunks_bytes_total", + Help: "Size of all chunks fetched to execute a query in bytes.", + }, []string{"user"}) + h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { h.querySeconds.DeleteLabelValues(user) + h.querySeries.DeleteLabelValues(user) + h.queryBytes.DeleteLabelValues(user) }) // If cleaner stops or fail, we will simply not clean the metrics for inactive users. _ = h.activeUsers.StartAsync(context.Background()) @@ -165,9 +179,14 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer return } userID := tenant.JoinTenantIDs(tenantIDs) + wallTime := stats.LoadWallTime() + numSeries := stats.LoadFetchedSeries() + numBytes := stats.LoadFetchedChunkBytes() // Track stats. - f.querySeconds.WithLabelValues(userID).Add(stats.LoadWallTime().Seconds()) + f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) + f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) + f.queryBytes.WithLabelValues(userID).Add(float64(numBytes)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) // Log stats. @@ -177,7 +196,9 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "method", r.Method, "path", r.URL.Path, "response_time", queryResponseTime, - "query_wall_time_seconds", stats.LoadWallTime().Seconds(), + "query_wall_time_seconds", wallTime.Seconds(), + "fetched_series_count", numSeries, + "fetched_chunks_bytes", numBytes, }, formatQueryString(queryString)...) level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 136f187993..8553d9fe21 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -2,15 +2,28 @@ package transport import ( "context" + "io" "net/http" "net/http/httptest" + "strings" "testing" + "github.com/go-kit/kit/log" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" ) +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return f(r) +} + func TestWriteError(t *testing.T) { for _, test := range []struct { status int @@ -28,3 +41,53 @@ func TestWriteError(t *testing.T) { }) } } + +func TestHandler_ServeHTTP(t *testing.T) { + for _, tt := range []struct { + name string + cfg HandlerConfig + expectedMetrics int + }{ + { + name: "test handler with stats enabled", + cfg: HandlerConfig{QueryStatsEnabled: true}, + expectedMetrics: 3, + }, + { + name: "test handler with stats disabled", + cfg: HandlerConfig{QueryStatsEnabled: false}, + expectedMetrics: 0, + }, + } { + t.Run(tt.name, func(t *testing.T) { + roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("{}")), + }, nil + }) + + reg := prometheus.NewPedanticRegistry() + handler := NewHandler(tt.cfg, roundTripper, log.NewNopLogger(), reg) + + ctx := user.InjectOrgID(context.Background(), "12345") + req := httptest.NewRequest("GET", "/", nil) + req = req.WithContext(ctx) + resp := httptest.NewRecorder() + + handler.ServeHTTP(resp, req) + _, _ = io.ReadAll(resp.Body) + require.Equal(t, resp.Code, http.StatusOK) + + count, err := promtest.GatherAndCount( + reg, + "cortex_query_seconds_total", + "cortex_query_fetched_series_total", + "cortex_query_fetched_chunks_bytes_total", + ) + + assert.NoError(t, err) + assert.Equal(t, tt.expectedMetrics, count) + }) + } +} diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 98cb61e140..02c50b5b9d 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -29,6 +29,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/series" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -565,6 +566,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( numChunks = atomic.NewInt32(0) spanLog = spanlogger.FromContext(ctx) queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) + reqStats = stats.FromContext(ctx) ) // Concurrently fetch series from all clients. @@ -626,10 +628,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) } } - chunksSize := 0 - for _, c := range s.Chunks { - chunksSize += c.Size() - } + chunksSize := countChunkBytes(s) if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil { return validation.LimitError(chunkBytesLimitErr.Error()) } @@ -657,10 +656,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } } + numSeries := len(mySeries) + chunkBytes := countChunkBytes(mySeries...) + + reqStats.AddFetchedSeries(uint64(numSeries)) + reqStats.AddFetchedChunkBytes(uint64(chunkBytes)) + level.Debug(spanLog).Log("msg", "received series from store-gateway", "instance", c.RemoteAddress(), - "num series", len(mySeries), - "bytes series", countSeriesBytes(mySeries), + "fetched series", numSeries, + "fetched chunk bytes", chunkBytes, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) @@ -944,12 +949,11 @@ func convertBlockHintsToULIDs(hints []hintspb.Block) ([]ulid.ULID, error) { return res, nil } -func countSeriesBytes(series []*storepb.Series) (count uint64) { +// countChunkBytes returns the size of the chunks making up the provided series in bytes +func countChunkBytes(series ...*storepb.Series) (count int) { for _, s := range series { for _, c := range s.Chunks { - if c.Raw != nil { - count += uint64(len(c.Raw.Data)) - } + count += c.Size() } } diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index 05a7de5347..1a39b32069 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -54,6 +54,38 @@ func (s *Stats) LoadWallTime() time.Duration { return time.Duration(atomic.LoadInt64((*int64)(&s.WallTime))) } +func (s *Stats) AddFetchedSeries(series uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.FetchedSeriesCount, series) +} + +func (s *Stats) LoadFetchedSeries() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.FetchedSeriesCount) +} + +func (s *Stats) AddFetchedChunkBytes(bytes uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.FetchedChunkBytes, bytes) +} + +func (s *Stats) LoadFetchedChunkBytes() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.FetchedChunkBytes) +} + // Merge the provide Stats into this one. func (s *Stats) Merge(other *Stats) { if s == nil || other == nil { @@ -61,6 +93,8 @@ func (s *Stats) Merge(other *Stats) { } s.AddWallTime(other.LoadWallTime()) + s.AddFetchedSeries(other.LoadFetchedSeries()) + s.AddFetchedChunkBytes(other.LoadFetchedChunkBytes()) } func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool { diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index b9ec9a49ba..9fd4affc1f 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -32,6 +32,10 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type Stats struct { // The sum of all wall time spent in the querier to execute the query. WallTime time.Duration `protobuf:"bytes,1,opt,name=wall_time,json=wallTime,proto3,stdduration" json:"wall_time"` + // The number of series fetched for the query + FetchedSeriesCount uint64 `protobuf:"varint,2,opt,name=fetched_series_count,json=fetchedSeriesCount,proto3" json:"fetched_series_count,omitempty"` + // The number of bytes of the chunks fetched for the query + FetchedChunkBytes uint64 `protobuf:"varint,3,opt,name=fetched_chunk_bytes,json=fetchedChunkBytes,proto3" json:"fetched_chunk_bytes,omitempty"` } func (m *Stats) Reset() { *m = Stats{} } @@ -73,6 +77,20 @@ func (m *Stats) GetWallTime() time.Duration { return 0 } +func (m *Stats) GetFetchedSeriesCount() uint64 { + if m != nil { + return m.FetchedSeriesCount + } + return 0 +} + +func (m *Stats) GetFetchedChunkBytes() uint64 { + if m != nil { + return m.FetchedChunkBytes + } + return 0 +} + func init() { proto.RegisterType((*Stats)(nil), "stats.Stats") } @@ -80,21 +98,25 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 213 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0x2e, 0x49, 0x2c, - 0x29, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x73, 0xa4, 0x74, 0xd3, 0x33, 0x4b, - 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0xf5, 0xc1, 0xb2, 0x49, - 0xa5, 0x69, 0x60, 0x1e, 0x98, 0x03, 0x66, 0x41, 0x74, 0x49, 0xc9, 0xa5, 0xe7, 0xe7, 0xa7, 0xe7, - 0xa4, 0x22, 0x54, 0xa5, 0x94, 0x16, 0x25, 0x96, 0x64, 0xe6, 0xe7, 0x41, 0xe4, 0x95, 0x3c, 0xb9, - 0x58, 0x83, 0x41, 0xe6, 0x0a, 0x39, 0x70, 0x71, 0x96, 0x27, 0xe6, 0xe4, 0xc4, 0x97, 0x64, 0xe6, - 0xa6, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x1b, 0x49, 0xea, 0x41, 0x34, 0xeb, 0xc1, 0x34, 0xeb, - 0xb9, 0x40, 0x35, 0x3b, 0x71, 0x9c, 0xb8, 0x27, 0xcf, 0x30, 0xe3, 0xbe, 0x3c, 0x63, 0x10, 0x07, - 0x48, 0x57, 0x48, 0x66, 0x6e, 0xaa, 0x93, 0xf5, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, 0x31, - 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, - 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, 0x8f, - 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x28, - 0x88, 0xb7, 0x92, 0xd8, 0xc0, 0x76, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x2d, 0xc4, 0x26, - 0x5d, 0xf3, 0x00, 0x00, 0x00, + // 281 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0xd0, 0xb1, 0x4e, 0x83, 0x40, + 0x1c, 0xc7, 0xf1, 0xfb, 0xab, 0x35, 0x95, 0x4e, 0xa2, 0x03, 0x76, 0xf8, 0xb7, 0x71, 0xea, 0xe2, + 0xd5, 0xe8, 0xe8, 0x62, 0xa8, 0x4f, 0xd0, 0x3a, 0xb9, 0x10, 0xa0, 0x57, 0x20, 0x02, 0x67, 0xe0, + 0x2e, 0xc6, 0xcd, 0x47, 0x70, 0xf4, 0x11, 0x4c, 0x7c, 0x91, 0x8e, 0x8c, 0x9d, 0x54, 0x8e, 0xc5, + 0xb1, 0x8f, 0x60, 0xee, 0xa0, 0x71, 0xe3, 0x97, 0x0f, 0xdf, 0x4b, 0xee, 0xac, 0x41, 0x29, 0x7c, + 0x51, 0xd2, 0xa7, 0x82, 0x0b, 0x6e, 0xf7, 0xcc, 0x18, 0x5e, 0x44, 0x89, 0x88, 0x65, 0x40, 0x43, + 0x9e, 0x4d, 0x23, 0x1e, 0xf1, 0xa9, 0xd1, 0x40, 0xae, 0xcc, 0x32, 0xc3, 0x7c, 0xb5, 0xd5, 0x10, + 0x23, 0xce, 0xa3, 0x94, 0xfd, 0xff, 0xb5, 0x94, 0x85, 0x2f, 0x12, 0x9e, 0xb7, 0x7e, 0xfe, 0x09, + 0x56, 0x6f, 0xa1, 0x0f, 0xb6, 0x6f, 0xad, 0xa3, 0x67, 0x3f, 0x4d, 0x3d, 0x91, 0x64, 0xcc, 0x81, + 0x31, 0x4c, 0x06, 0x57, 0x67, 0xb4, 0xad, 0xe9, 0xae, 0xa6, 0x77, 0x5d, 0xed, 0xf6, 0xd7, 0x5f, + 0x23, 0xf2, 0xfe, 0x3d, 0x82, 0x79, 0x5f, 0x57, 0xf7, 0x49, 0xc6, 0xec, 0x4b, 0xeb, 0x74, 0xc5, + 0x44, 0x18, 0xb3, 0xa5, 0x57, 0xb2, 0x22, 0x61, 0xa5, 0x17, 0x72, 0x99, 0x0b, 0x67, 0x6f, 0x0c, + 0x93, 0x83, 0xb9, 0xdd, 0xd9, 0xc2, 0xd0, 0x4c, 0x8b, 0x4d, 0xad, 0x93, 0x5d, 0x11, 0xc6, 0x32, + 0x7f, 0xf4, 0x82, 0x17, 0xc1, 0x4a, 0x67, 0xdf, 0x04, 0xc7, 0x1d, 0xcd, 0xb4, 0xb8, 0x1a, 0xdc, + 0x9b, 0xaa, 0x46, 0xb2, 0xa9, 0x91, 0x6c, 0x6b, 0x84, 0x57, 0x85, 0xf0, 0xa1, 0x10, 0xd6, 0x0a, + 0xa1, 0x52, 0x08, 0x3f, 0x0a, 0xe1, 0x57, 0x21, 0xd9, 0x2a, 0x84, 0xb7, 0x06, 0x49, 0xd5, 0x20, + 0xd9, 0x34, 0x48, 0x1e, 0xda, 0x97, 0x0b, 0x0e, 0xcd, 0x2d, 0xae, 0xff, 0x02, 0x00, 0x00, 0xff, + 0xff, 0x9d, 0xf1, 0x86, 0xb8, 0x56, 0x01, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -119,15 +141,23 @@ func (this *Stats) Equal(that interface{}) bool { if this.WallTime != that1.WallTime { return false } + if this.FetchedSeriesCount != that1.FetchedSeriesCount { + return false + } + if this.FetchedChunkBytes != that1.FetchedChunkBytes { + return false + } return true } func (this *Stats) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 5) + s := make([]string, 0, 7) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") + s = append(s, "FetchedSeriesCount: "+fmt.Sprintf("%#v", this.FetchedSeriesCount)+",\n") + s = append(s, "FetchedChunkBytes: "+fmt.Sprintf("%#v", this.FetchedChunkBytes)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -159,6 +189,16 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.FetchedChunkBytes != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.FetchedChunkBytes)) + i-- + dAtA[i] = 0x18 + } + if m.FetchedSeriesCount != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.FetchedSeriesCount)) + i-- + dAtA[i] = 0x10 + } n1, err1 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.WallTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime):]) if err1 != nil { return 0, err1 @@ -189,6 +229,12 @@ func (m *Stats) Size() (n int) { _ = l l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.WallTime) n += 1 + l + sovStats(uint64(l)) + if m.FetchedSeriesCount != 0 { + n += 1 + sovStats(uint64(m.FetchedSeriesCount)) + } + if m.FetchedChunkBytes != 0 { + n += 1 + sovStats(uint64(m.FetchedChunkBytes)) + } return n } @@ -204,6 +250,8 @@ func (this *Stats) String() string { } s := strings.Join([]string{`&Stats{`, `WallTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.WallTime), "Duration", "duration.Duration", 1), `&`, ``, 1) + `,`, + `FetchedSeriesCount:` + fmt.Sprintf("%v", this.FetchedSeriesCount) + `,`, + `FetchedChunkBytes:` + fmt.Sprintf("%v", this.FetchedChunkBytes) + `,`, `}`, }, "") return s @@ -278,6 +326,44 @@ func (m *Stats) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FetchedSeriesCount", wireType) + } + m.FetchedSeriesCount = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.FetchedSeriesCount |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FetchedChunkBytes", wireType) + } + m.FetchedChunkBytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.FetchedChunkBytes |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index 3ec55448af..765dd99582 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -13,4 +13,8 @@ option (gogoproto.unmarshaler_all) = true; message Stats { // The sum of all wall time spent in the querier to execute the query. google.protobuf.Duration wall_time = 1 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + // The number of series fetched for the query + uint64 fetched_series_count = 2; + // The number of bytes of the chunks fetched for the query + uint64 fetched_chunk_bytes = 3; } diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go new file mode 100644 index 0000000000..f8a23c96c3 --- /dev/null +++ b/pkg/querier/stats/stats_test.go @@ -0,0 +1,91 @@ +package stats + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStats_WallTime(t *testing.T) { + t.Run("add and load wall time", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddWallTime(time.Second) + stats.AddWallTime(time.Second) + + assert.Equal(t, 2*time.Second, stats.LoadWallTime()) + }) + + t.Run("add and load wall time nil receiver", func(t *testing.T) { + var stats *Stats + stats.AddWallTime(time.Second) + + assert.Equal(t, time.Duration(0), stats.LoadWallTime()) + }) +} + +func TestStats_AddFetchedSeries(t *testing.T) { + t.Run("add and load series", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddFetchedSeries(100) + stats.AddFetchedSeries(50) + + assert.Equal(t, uint64(150), stats.LoadFetchedSeries()) + }) + + t.Run("add and load series nil receiver", func(t *testing.T) { + var stats *Stats + stats.AddFetchedSeries(50) + + assert.Equal(t, uint64(0), stats.LoadFetchedSeries()) + }) +} + +func TestStats_AddFetchedChunkBytes(t *testing.T) { + t.Run("add and load bytes", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddFetchedChunkBytes(4096) + stats.AddFetchedChunkBytes(4096) + + assert.Equal(t, uint64(8192), stats.LoadFetchedChunkBytes()) + }) + + t.Run("add and load bytes nil receiver", func(t *testing.T) { + var stats *Stats + stats.AddFetchedChunkBytes(1024) + + assert.Equal(t, uint64(0), stats.LoadFetchedChunkBytes()) + }) +} + +func TestStats_Merge(t *testing.T) { + t.Run("merge two stats objects", func(t *testing.T) { + stats1 := &Stats{} + stats1.AddWallTime(time.Millisecond) + stats1.AddFetchedSeries(50) + stats1.AddFetchedChunkBytes(42) + + stats2 := &Stats{} + stats2.AddWallTime(time.Second) + stats2.AddFetchedSeries(60) + stats2.AddFetchedChunkBytes(100) + + stats1.Merge(stats2) + + assert.Equal(t, 1001*time.Millisecond, stats1.LoadWallTime()) + assert.Equal(t, uint64(110), stats1.LoadFetchedSeries()) + assert.Equal(t, uint64(142), stats1.LoadFetchedChunkBytes()) + }) + + t.Run("merge two nil stats objects", func(t *testing.T) { + var stats1 *Stats + var stats2 *Stats + + stats1.Merge(stats2) + + assert.Equal(t, time.Duration(0), stats1.LoadWallTime()) + assert.Equal(t, uint64(0), stats1.LoadFetchedSeries()) + assert.Equal(t, uint64(0), stats1.LoadFetchedChunkBytes()) + }) +}