Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed May 24, 2023
1 parent 4436943 commit bac7b1f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 80 deletions.
97 changes: 47 additions & 50 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,30 @@ var (
)

const (
reasonRequestBodyTooLarge = "request_body_too_large"
reasonResponseTooLarge = "response_too_large"
reasonTooManyRequests = "too_many_requests"
reasonTooLongRange = "too_long_range"
reasonTooManySamples = "too_many_samples"
reasonSeriesFetched = "series_fetched"
reasonChunksFetched = "chunks_fetched"
reasonChunkBytesFetched = "chunk_bytes_fetched"
reasonDataBytesFetched = "data_bytes_fetched"
reasonSeriesLimitStoreGateway = "series_limit_store_gateway"
reasonChunksLimitStoreGateway = "chunks_limit_store_gateway"
reasonBytesLimitStoreGateway = "bytes_limit_store_gateway"
)

var (
LimitTooManySamples = `query processing would load too many samples into memory`
LimitTooLongRange = `the query time range exceeds the limit`
LimitSeriesFetched = `the query hit the max number of series limit`
LimitChunksFetched = `the query hit the max number of chunks limit`
LimitChunkBytesFetched = `the query hit the aggregated chunks size limit`
LimitDataBytesFetched = `the query hit the aggregated data size limit`
reasonRequestBodySizeExceeded = "request_body_size_exceeded"
reasonResponseBodySizeExceeded = "response_body_size_exceeded"
reasonTooManyRequests = "too_many_requests"
reasonTimeRangeExceeded = "time_range_exceeded"
reasonTooManySamples = "too_many_samples"
reasonSeriesFetched = "series_fetched"
reasonChunksFetched = "chunks_fetched"
reasonChunkBytesFetched = "chunk_bytes_fetched"
reasonDataBytesFetched = "data_bytes_fetched"
reasonSeriesLimitStoreGateway = "store_gateway_series_limit"
reasonChunksLimitStoreGateway = "store_gateway_chunks_limit"
reasonBytesLimitStoreGateway = "store_gateway_bytes_limit"

limitTooManySamples = `query processing would load too many samples into memory`
limitTimeRangeExceeded = `the query time range exceeds the limit`
limitSeriesFetched = `the query hit the max number of series limit`
limitChunksFetched = `the query hit the max number of chunks limit`
limitChunkBytesFetched = `the query hit the aggregated chunks size limit`
limitDataBytesFetched = `the query hit the aggregated data size limit`

// Store gateway limits.
LimitSeriesStoreGateway = `exceeded series limit`
LimitChunksStoreGateway = `exceeded chunks limit`
LimitBytesStoreGateway = `exceeded bytes limit`
limitSeriesStoreGateway = `exceeded series limit`
limitChunksStoreGateway = `exceeded chunks limit`
limitBytesStoreGateway = `exceeded bytes limit`
)

// Config for a Handler.
Expand All @@ -91,13 +89,13 @@ type Handler struct {
roundTripper http.RoundTripper

// Metrics.
queriesCount *prometheus.CounterVec
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
discardedQueries *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
queriesCount *prometheus.CounterVec
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryChunkBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
rejectedQueries *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}

// NewHandler creates a new frontend handler.
Expand Down Expand Up @@ -134,10 +132,10 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Help: "Size of all data fetched to execute a query in bytes.",
}, []string{"user"})

h.discardedQueries = prometheus.NewCounterVec(
h.rejectedQueries = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_discarded_queries_total",
Help: "The total number of queries that were discarded.",
Name: "cortex_rejected_queries_total",
Help: "The total number of queries that were rejected.",
},
[]string{"reason", "user"},
)
Expand All @@ -148,8 +146,8 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
h.querySeries.DeleteLabelValues(user)
h.queryChunkBytes.DeleteLabelValues(user)
h.queryDataBytes.DeleteLabelValues(user)
if err := util.DeleteMatchingLabels(h.discardedQueries, map[string]string{"user": user}); err != nil {
level.Warn(log).Log("msg", "failed to remove cortex_discarded_queries_total metric for user", "user", user, "err", err)
if err := util.DeleteMatchingLabels(h.rejectedQueries, map[string]string{"user": user}); err != nil {
level.Warn(log).Log("msg", "failed to remove cortex_rejected_queries_total metric for user", "user", user, "err", err)
}
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
Expand Down Expand Up @@ -198,7 +196,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
writeError(w, err)
if f.cfg.QueryStatsEnabled && util.IsRequestBodyTooLarge(err) {
f.discardedQueries.WithLabelValues(reasonRequestBodyTooLarge, userID).Inc()
f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, userID).Inc()
}
return
}
Expand Down Expand Up @@ -360,35 +358,34 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
}

var reason string
// 413, 422 or 429.
if statusCode == http.StatusTooManyRequests {
reason = reasonTooManyRequests
} else if statusCode == http.StatusRequestEntityTooLarge {
reason = reasonResponseTooLarge
reason = reasonResponseBodySizeExceeded
} else if statusCode == http.StatusUnprocessableEntity {
errMsg := error.Error()
if strings.Contains(errMsg, LimitTooManySamples) {
if strings.Contains(errMsg, limitTooManySamples) {
reason = reasonTooManySamples
} else if strings.Contains(errMsg, LimitTooLongRange) {
reason = reasonTooLongRange
} else if strings.Contains(errMsg, LimitSeriesFetched) {
} else if strings.Contains(errMsg, limitTimeRangeExceeded) {
reason = reasonTimeRangeExceeded
} else if strings.Contains(errMsg, limitSeriesFetched) {
reason = reasonSeriesFetched
} else if strings.Contains(errMsg, LimitChunksFetched) {
} else if strings.Contains(errMsg, limitChunksFetched) {
reason = reasonChunksFetched
} else if strings.Contains(errMsg, LimitChunkBytesFetched) {
} else if strings.Contains(errMsg, limitChunkBytesFetched) {
reason = reasonChunkBytesFetched
} else if strings.Contains(errMsg, LimitDataBytesFetched) {
} else if strings.Contains(errMsg, limitDataBytesFetched) {
reason = reasonDataBytesFetched
} else if strings.Contains(errMsg, LimitSeriesStoreGateway) {
} else if strings.Contains(errMsg, limitSeriesStoreGateway) {
reason = reasonSeriesLimitStoreGateway
} else if strings.Contains(errMsg, LimitChunksStoreGateway) {
} else if strings.Contains(errMsg, limitChunksStoreGateway) {
reason = reasonChunksLimitStoreGateway
} else if strings.Contains(errMsg, LimitBytesStoreGateway) {
} else if strings.Contains(errMsg, limitBytesStoreGateway) {
reason = reasonBytesLimitStoreGateway
}
}
if len(reason) > 0 {
f.discardedQueries.WithLabelValues(reason, userID).Inc()
f.rejectedQueries.WithLabelValues(reason, userID).Inc()
}
}

Expand Down
40 changes: 20 additions & 20 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonResponseTooLarge, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -96,7 +96,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonTooManyRequests, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -107,11 +107,11 @@ func TestHandler_ServeHTTP(t *testing.T) {
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Body: io.NopCloser(strings.NewReader(LimitTooManySamples)),
Body: io.NopCloser(strings.NewReader(limitTooManySamples)),
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonTooManySamples, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -122,11 +122,11 @@ func TestHandler_ServeHTTP(t *testing.T) {
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Body: io.NopCloser(strings.NewReader(LimitTooLongRange)),
Body: io.NopCloser(strings.NewReader(limitTimeRangeExceeded)),
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonTooLongRange, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -137,11 +137,11 @@ func TestHandler_ServeHTTP(t *testing.T) {
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Body: io.NopCloser(strings.NewReader(LimitSeriesFetched)),
Body: io.NopCloser(strings.NewReader(limitSeriesFetched)),
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonSeriesFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -152,11 +152,11 @@ func TestHandler_ServeHTTP(t *testing.T) {
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Body: io.NopCloser(strings.NewReader(LimitChunksFetched)),
Body: io.NopCloser(strings.NewReader(limitChunksFetched)),
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonChunksFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -167,11 +167,11 @@ func TestHandler_ServeHTTP(t *testing.T) {
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Body: io.NopCloser(strings.NewReader(LimitChunkBytesFetched)),
Body: io.NopCloser(strings.NewReader(limitChunkBytesFetched)),
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonChunkBytesFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -182,11 +182,11 @@ func TestHandler_ServeHTTP(t *testing.T) {
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Body: io.NopCloser(strings.NewReader(LimitDataBytesFetched)),
Body: io.NopCloser(strings.NewReader(limitDataBytesFetched)),
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonDataBytesFetched, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -197,11 +197,11 @@ func TestHandler_ServeHTTP(t *testing.T) {
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Body: io.NopCloser(strings.NewReader(LimitSeriesStoreGateway)),
Body: io.NopCloser(strings.NewReader(limitSeriesStoreGateway)),
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -212,11 +212,11 @@ func TestHandler_ServeHTTP(t *testing.T) {
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Body: io.NopCloser(strings.NewReader(LimitChunksStoreGateway)),
Body: io.NopCloser(strings.NewReader(limitChunksStoreGateway)),
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonChunksLimitStoreGateway, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, userID))
assert.Equal(t, float64(1), v)
},
},
Expand All @@ -227,11 +227,11 @@ func TestHandler_ServeHTTP(t *testing.T) {
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
Body: io.NopCloser(strings.NewReader(LimitBytesStoreGateway)),
Body: io.NopCloser(strings.NewReader(limitBytesStoreGateway)),
}, nil
}),
additionalMetricsCheckFunc: func(h *Handler) {
v := promtest.ToFloat64(h.discardedQueries.WithLabelValues(reasonBytesLimitStoreGateway, userID))
v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, userID))
assert.Equal(t, float64(1), v)
},
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ message Stats {
uint64 fetched_chunks_count = 6;
// The number of samples fetched for the query
uint64 fetched_samples_count = 7;
// The limit hit when executing the query
string limit_hit = 8 [(gogoproto.nullable) = true];
}
10 changes: 0 additions & 10 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,10 @@ var DiscardedMetadata = prometheus.NewCounterVec(
[]string{discardReasonLabel, "user"},
)

// DiscardedQueries is a metric of the number of discarded queries, by reason.
var DiscardedQueries = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_discarded_query_total",
Help: "The total number of queries that were discarded.",
},
[]string{discardReasonLabel, "user"},
)

func init() {
prometheus.MustRegister(DiscardedSamples)
prometheus.MustRegister(DiscardedExemplars)
prometheus.MustRegister(DiscardedMetadata)
prometheus.MustRegister(DiscardedQueries)
}

// ValidateSample returns an err if the sample is invalid.
Expand Down

0 comments on commit bac7b1f

Please sign in to comment.