Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement max bucket limit and automatic resolution reduction #6104

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
18 changes: 13 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3159,11 +3159,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# e.g. remote_write.write_relabel_configs.
[metric_relabel_configs: <relabel_config...> | default = []]

# Enables support for exemplars in TSDB and sets the maximum number that will be
# stored. less than zero means disabled. If the value is set to zero, cortex
# will fallback to blocks-storage.tsdb.max-exemplars value.
# CLI flag: -ingester.max-exemplars
[max_exemplars: <int> | default = 0]
# Limit on total number of positive and negative buckets allowed in a single
# native histogram. The resolution of a histogram with more buckets will be
# reduced until the number of buckets is within the limit. If the limit cannot
# be reached, the sample will be discarded. 0 means no limit. Enforced at
# Distributor.
# CLI flag: -validation.max-native-histogram-buckets
[max_native_histogram_buckets: <int> | default = 0]

# The maximum number of active series per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-series-per-user
Expand Down Expand Up @@ -3213,6 +3215,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.out-of-order-time-window
[out_of_order_time_window: <duration> | default = 0s]

# Enables support for exemplars in TSDB and sets the maximum number that will be
# stored. less than zero means disabled. If the value is set to zero, cortex
# will fallback to blocks-storage.tsdb.max-exemplars value.
# CLI flag: -ingester.max-exemplars
[max_exemplars: <int> | default = 0]

# Maximum number of chunks that can be fetched in a single query from ingesters
# and long-term storage. This limit is enforced in the querier, ruler and
# store-gateway. 0 to disable.
Expand Down
5 changes: 5 additions & 0 deletions pkg/cortexpb/histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ package cortexpb

import "github.com/prometheus/prometheus/model/histogram"

const (
ExponentialSchemaMax int32 = 8
ExponentialSchemaMin int32 = -4
)

func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
Expand Down
10 changes: 7 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,16 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
if len(ts.Histograms) > 0 {
// Only alloc when data present
histograms = make([]cortexpb.Histogram, 0, len(ts.Histograms))
for _, h := range ts.Histograms {
// TODO(yeya24): add other validations for native histogram.
// For example, Prometheus scrape has bucket limit and schema check.
for i, h := range ts.Histograms {
if err := validation.ValidateSampleTimestamp(d.validateMetrics, limits, userID, ts.Labels, h.TimestampMs); err != nil {
return emptyPreallocSeries, err
}
// TODO(yeya24): add max schema validation for native histogram if needed.
convertedHistogram, err := validation.ValidateNativeHistogram(d.validateMetrics, limits, userID, ts.Labels, h)
if err != nil {
return emptyPreallocSeries, err
}
ts.Histograms[i] = convertedHistogram
}
histograms = append(histograms, ts.Histograms...)
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,24 @@ func newExemplarLabelLengthError(seriesLabels []cortexpb.LabelAdapter, exemplarL
}
}

// histogramBucketLimitExceededError is a ValidationError implementation for samples with native histogram
// exceeding max bucket limit and cannot reduce resolution further to be within the max bucket limit.
type histogramBucketLimitExceededError struct {
series []cortexpb.LabelAdapter
limit int
}

func newHistogramBucketLimitExceededError(series []cortexpb.LabelAdapter, limit int) ValidationError {
return &histogramBucketLimitExceededError{
series: series,
limit: limit,
}
}

func (e *histogramBucketLimitExceededError) Error() string {
return fmt.Sprintf("native histogram bucket count exceeded for metric (limit: %d) metric: %.200q", e.limit, formatLabelSet(e.series))
}

// formatLabelSet formats label adapters as a metric name with labels, while preserving
// label order, and keeping duplicates. If there are multiple "__name__" labels, only
// first one is used as metric name, other ones will be included as regular labels.
Expand Down
11 changes: 10 additions & 1 deletion pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type Limits struct {
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"`
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."`
MaxExemplars int `yaml:"max_exemplars" json:"max_exemplars"`
MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"`

// Ingester enforced limits.
// Series
Expand All @@ -151,6 +151,8 @@ type Limits struct {
MaxGlobalMetadataPerMetric int `yaml:"max_global_metadata_per_metric" json:"max_global_metadata_per_metric"`
// Out-of-order
OutOfOrderTimeWindow model.Duration `yaml:"out_of_order_time_window" json:"out_of_order_time_window"`
// Exemplars
MaxExemplars int `yaml:"max_exemplars" json:"max_exemplars"`

// Querier enforced limits.
MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"`
Expand Down Expand Up @@ -232,6 +234,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.")
f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.")
f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.")
f.IntVar(&l.MaxNativeHistogramBuckets, "validation.max-native-histogram-buckets", 0, "Limit on total number of positive and negative buckets allowed in a single native histogram. The resolution of a histogram with more buckets will be reduced until the number of buckets is within the limit. If the limit cannot be reached, the sample will be discarded. 0 means no limit. Enforced at Distributor.")

f.IntVar(&l.MaxLocalSeriesPerUser, "ingester.max-series-per-user", 5000000, "The maximum number of active series per user, per ingester. 0 to disable.")
f.IntVar(&l.MaxLocalSeriesPerMetric, "ingester.max-series-per-metric", 50000, "The maximum number of active series per metric name, per ingester. 0 to disable.")
Expand Down Expand Up @@ -722,6 +725,12 @@ func (o *Overrides) EnforceMetadataMetricName(userID string) bool {
return o.GetOverridesForUser(userID).EnforceMetadataMetricName
}

// MaxNativeHistogramBuckets returns the maximum total number of positive and negative buckets of a single native histogram
// a user is allowed to store.
func (o *Overrides) MaxNativeHistogramBuckets(userID string) int {
return o.GetOverridesForUser(userID).MaxNativeHistogramBuckets
}

// MaxLocalMetricsWithMetadataPerUser returns the maximum number of metrics with metadata a user is allowed to store in a single ingester.
func (o *Overrides) MaxLocalMetricsWithMetadataPerUser(userID string) int {
return o.GetOverridesForUser(userID).MaxLocalMetricsWithMetadataPerUser
Expand Down
56 changes: 56 additions & 0 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (
exemplarLabelsTooLong = "exemplar_labels_too_long"
exemplarTimestampInvalid = "exemplar_timestamp_invalid"

// Native Histogram specific validation reasons
nativeHistogramBucketCountLimitExceeded = "native_histogram_buckets_exceeded"

// RateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
RateLimited = "rate_limited"
Expand Down Expand Up @@ -262,6 +265,59 @@ func ValidateMetadata(validateMetrics *ValidateMetrics, cfg *Limits, userID stri
return nil
}

func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, userID string, ls []cortexpb.LabelAdapter, histogram cortexpb.Histogram) (cortexpb.Histogram, error) {
if limits.MaxNativeHistogramBuckets == 0 {
return histogram, nil
}

var (
exceedLimit bool
)
if histogram.IsFloatHistogram() {
// Initial check to see if the bucket limit is exceeded or not. If not, we can avoid type casting.
exceedLimit = len(histogram.PositiveCounts)+len(histogram.NegativeCounts) > limits.MaxNativeHistogramBuckets
if !exceedLimit {
return histogram, nil
}
// Exceed limit.
if histogram.Schema <= cortexpb.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
fh := cortexpb.FloatHistogramProtoToFloatHistogram(histogram)
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > limits.MaxNativeHistogramBuckets {
if fh.Schema <= cortexpb.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
fh = fh.ReduceResolution(fh.Schema - 1)
}
// If resolution reduced, convert new float histogram to protobuf type again.
return cortexpb.FloatHistogramToHistogramProto(histogram.TimestampMs, fh), nil
}

// Initial check to see if bucket limit is exceeded or not. If not, we can avoid type casting.
exceedLimit = len(histogram.PositiveDeltas)+len(histogram.NegativeDeltas) > limits.MaxNativeHistogramBuckets
if !exceedLimit {
return histogram, nil
}
// Exceed limit.
if histogram.Schema <= cortexpb.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
h := cortexpb.HistogramProtoToHistogram(histogram)
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > limits.MaxNativeHistogramBuckets {
if h.Schema <= cortexpb.ExponentialSchemaMin {
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID).Inc()
return cortexpb.Histogram{}, newHistogramBucketLimitExceededError(ls, limits.MaxNativeHistogramBuckets)
}
h = h.ReduceResolution(h.Schema - 1)
}
// If resolution reduced, convert new histogram to protobuf type again.
return cortexpb.HistogramToHistogramProto(histogram.TimestampMs, h), nil
}

func DeletePerUserValidationMetrics(validateMetrics *ValidateMetrics, userID string, log log.Logger) {
filter := map[string]string{"user": userID}

Expand Down
109 changes: 109 additions & 0 deletions pkg/util/validation/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -291,3 +293,110 @@ func TestValidateLabelDuplication(t *testing.T) {
}, "a")
assert.Equal(t, expected, actual)
}

func TestValidateNativeHistogram(t *testing.T) {
userID := "fake"
lbls := cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("foo", "bar"))

// Test histogram has 4 positive buckets and 4 negative buckets so 8 in total. Schema set to 1.
h := tsdbutil.GenerateTestHistogram(0)
fh := tsdbutil.GenerateTestFloatHistogram(0)

histogramWithSchemaMin := tsdbutil.GenerateTestHistogram(0)
histogramWithSchemaMin.Schema = cortexpb.ExponentialSchemaMin
floatHistogramWithSchemaMin := tsdbutil.GenerateTestFloatHistogram(0)
floatHistogramWithSchemaMin.Schema = cortexpb.ExponentialSchemaMin
for _, tc := range []struct {
name string
bucketLimit int
histogram cortexpb.Histogram
expectedHistogram cortexpb.Histogram
expectedErr error
}{
{
name: "no limit, histogram",
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
},
{
name: "no limit, float histogram",
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
},
{
name: "within limit, histogram",
bucketLimit: 8,
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
},
{
name: "within limit, float histogram",
bucketLimit: 8,
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
},
{
name: "exceed limit and reduce resolution for 1 level, histogram",
bucketLimit: 6,
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy().ReduceResolution(0)),
},
{
name: "exceed limit and reduce resolution for 1 level, float histogram",
bucketLimit: 6,
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy().ReduceResolution(0)),
},
{
name: "exceed limit and reduce resolution for 2 levels, histogram",
bucketLimit: 4,
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedHistogram: cortexpb.HistogramToHistogramProto(0, h.Copy().ReduceResolution(-1)),
},
{
name: "exceed limit and reduce resolution for 2 levels, float histogram",
bucketLimit: 4,
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedHistogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy().ReduceResolution(-1)),
},
{
name: "exceed limit but cannot reduce resolution further, histogram",
bucketLimit: 1,
histogram: cortexpb.HistogramToHistogramProto(0, h.Copy()),
expectedErr: newHistogramBucketLimitExceededError(lbls, 1),
},
{
name: "exceed limit but cannot reduce resolution further, float histogram",
bucketLimit: 1,
histogram: cortexpb.FloatHistogramToHistogramProto(0, fh.Copy()),
expectedErr: newHistogramBucketLimitExceededError(lbls, 1),
},
{
name: "exceed limit but cannot reduce resolution further with min schema, histogram",
bucketLimit: 4,
histogram: cortexpb.HistogramToHistogramProto(0, histogramWithSchemaMin.Copy()),
expectedErr: newHistogramBucketLimitExceededError(lbls, 4),
},
{
name: "exceed limit but cannot reduce resolution further with min schema, float histogram",
bucketLimit: 4,
histogram: cortexpb.FloatHistogramToHistogramProto(0, floatHistogramWithSchemaMin.Copy()),
expectedErr: newHistogramBucketLimitExceededError(lbls, 4),
},
} {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewRegistry()
validateMetrics := NewValidateMetrics(reg)
limits := new(Limits)
limits.MaxNativeHistogramBuckets = tc.bucketLimit
actualHistogram, actualErr := ValidateNativeHistogram(validateMetrics, limits, userID, lbls, tc.histogram)
if tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, actualErr)
require.Equal(t, float64(1), testutil.ToFloat64(validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramBucketCountLimitExceeded, userID)))
} else {
require.NoError(t, actualErr)
require.Equal(t, tc.expectedHistogram, actualHistogram)
}
})
}
}
Loading