diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4f32ee4564f..1de83b7fb69 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -254,11 +254,12 @@ func (r tsdbCloseCheckResult) shouldClose() bool { } type userTSDB struct { - db *tsdb.DB - userID string - activeSeries *ActiveSeries - seriesInMetric *metricCounter - limiter *Limiter + db *tsdb.DB + userID string + activeSeries *ActiveSeries + seriesInMetric *metricCounter + labelSetCounter *labelSetCounter + limiter *Limiter instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester. instanceLimitsFn func() *InstanceLimits @@ -399,6 +400,10 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error { return err } + if err := u.labelSetCounter.canAddSeriesForLabelSet(context.TODO(), u, metric); err != nil { + return err + } + return nil } @@ -412,6 +417,7 @@ func (u *userTSDB) PostCreation(metric labels.Labels) { return } u.seriesInMetric.increaseSeriesForMetric(metricName) + u.labelSetCounter.increaseSeriesLabelSet(u, metric) } // PostDeletion implements SeriesLifecycleCallback interface. @@ -425,6 +431,7 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels) continue } u.seriesInMetric.decreaseSeriesForMetric(metricName) + u.labelSetCounter.decreaseSeriesLabelSet(u, metric) } } @@ -924,6 +931,7 @@ func (i *Ingester) updateActiveSeries() { userDB.activeSeries.Purge(purgeTime) i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active())) + userDB.labelSetCounter.UpdateMetric(userDB, i.metrics.activeSeriesPerLabelSet) } } @@ -1100,38 +1108,43 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // of it, so that we can return it back to the distributor, which will return a // 400 error to the client. The client (Prometheus) will not retry on 400, and // we actually ingested all samples which haven't failed. - switch cause := errors.Cause(err); cause { - case storage.ErrOutOfBounds: + switch cause := errors.Cause(err); { + case errors.Is(cause, storage.ErrOutOfBounds): sampleOutOfBoundsCount++ updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) }) continue - case storage.ErrOutOfOrderSample: + case errors.Is(cause, storage.ErrOutOfOrderSample): sampleOutOfOrderCount++ updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) }) continue - case storage.ErrDuplicateSampleForTimestamp: + case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp): newValueForTimestampCount++ updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) }) continue - case storage.ErrTooOldSample: + case errors.Is(cause, storage.ErrTooOldSample): sampleTooOldCount++ updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) }) continue - case errMaxSeriesPerUserLimitExceeded: + case errors.Is(cause, errMaxSeriesPerUserLimitExceeded): perUserSeriesLimitCount++ updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) }) continue - case errMaxSeriesPerMetricLimitExceeded: + case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded): perMetricSeriesLimitCount++ updateFirstPartial(func() error { return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause)) }) continue + case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}): + updateFirstPartial(func() error { + return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause)) + }) + continue } // The error looks an issue on our side, so we should rollback @@ -2018,6 +2031,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { userID: userID, activeSeries: NewActiveSeries(), seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), + labelSetCounter: newLabelSetCounter(i.limiter), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 4e0585b7122..4e2535f11a3 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -3,6 +3,7 @@ package ingester import ( "bytes" "context" + "encoding/json" "fmt" "io" "math" @@ -76,6 +77,220 @@ func runTestQueryTimes(ctx context.Context, t *testing.T, ing *Ingester, ty labe return res, req, nil } +func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { + limits := defaultLimitsTestConfig() + userID := "1" + registry := prometheus.NewRegistry() + + limits.MaxSeriesPerLabelSet = []validation.MaxSeriesPerLabelSet{ + { + LabelSet: map[string]string{ + "label1": "value1", + }, + Limit: 3, + }, + { + LabelSet: map[string]string{ + "label2": "value2", + }, + Limit: 2, + }, + } + tenantLimits := newMockTenantLimits(map[string]*validation.Limits{userID: &limits}) + + b, err := json.Marshal(limits) + require.NoError(t, err) + require.NoError(t, limits.UnmarshalJSON(b)) + + dir := t.TempDir() + chunksDir := filepath.Join(dir, "chunks") + blocksDir := filepath.Join(dir, "blocks") + require.NoError(t, os.Mkdir(chunksDir, os.ModePerm)) + require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) + + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + // Wait until it's ACTIVE + test.Poll(t, time.Second, ring.ACTIVE, func() interface{} { + return ing.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), userID) + samples := []cortexpb.Sample{{Value: 2, TimestampMs: 10}} + + // Create first series within the limits + for _, set := range limits.MaxSeriesPerLabelSet { + lbls := []string{labels.MetricName, "metric_name"} + for k, v := range set.LabelSet { + lbls = append(lbls, k, v) + } + for i := 0; i < set.Limit; i++ { + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "extraLabel", fmt.Sprintf("extraValue%v", i))...)}, samples, nil, nil, cortexpb.API)) + require.NoError(t, err) + } + } + + ing.updateActiveSeries() + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. + # TYPE cortex_ingester_active_series_per_labelset gauge + cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 + cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2 + `), "cortex_ingester_active_series_per_labelset")) + + // Should impose limits + for _, set := range limits.MaxSeriesPerLabelSet { + lbls := []string{labels.MetricName, "metric_name"} + for k, v := range set.LabelSet { + lbls = append(lbls, k, v) + } + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "newLabel", "newValue")...)}, samples, nil, nil, cortexpb.API)) + httpResp, ok := httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "returned error is not an httpgrpc response") + assert.Equal(t, http.StatusBadRequest, int(httpResp.Code)) + require.ErrorContains(t, err, set.Id) + } + + ing.updateActiveSeries() + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. + # TYPE cortex_ingester_active_series_per_labelset gauge + cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 + cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2 + `), "cortex_ingester_active_series_per_labelset")) + + // Should apply composite limits + limits.MaxSeriesPerLabelSet = append(limits.MaxSeriesPerLabelSet, + validation.MaxSeriesPerLabelSet{LabelSet: map[string]string{ + "comp1": "compValue1", + }, + Limit: 10, + }, + validation.MaxSeriesPerLabelSet{LabelSet: map[string]string{ + "comp2": "compValue2", + }, + Limit: 10, + }, + validation.MaxSeriesPerLabelSet{LabelSet: map[string]string{ + "comp1": "compValue1", + "comp2": "compValue2", + }, + Limit: 2, + }, + ) + + b, err = json.Marshal(limits) + require.NoError(t, err) + require.NoError(t, limits.UnmarshalJSON(b)) + tenantLimits.setLimits(userID, &limits) + + for i := 0; i < 5; i++ { + lbls := []string{labels.MetricName, "metric_name", "comp1", "compValue1"} + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "extraLabel", fmt.Sprintf("extraValue%v", i))...)}, samples, nil, nil, cortexpb.API)) + require.NoError(t, err) + } + + for i := 0; i < 2; i++ { + lbls := []string{labels.MetricName, "metric_name", "comp1", "compValue1", "comp2", "compValue2"} + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "extraLabel", fmt.Sprintf("extraValue%v", i))...)}, samples, nil, nil, cortexpb.API)) + require.NoError(t, err) + } + + lbls := []string{labels.MetricName, "metric_name", "comp1", "compValue1", "comp2", "compValue2"} + + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "newLabel", "newValue")...)}, samples, nil, nil, cortexpb.API)) + httpResp, ok := httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "returned error is not an httpgrpc response") + assert.Equal(t, http.StatusBadRequest, int(httpResp.Code)) + require.ErrorContains(t, err, labels.FromStrings("comp1", "compValue1", "comp2", "compValue2").String()) + + ing.updateActiveSeries() + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. + # TYPE cortex_ingester_active_series_per_labelset gauge + cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 + cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2 + cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",user="1"} 2 + cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\"}",user="1"} 7 + cortex_ingester_active_series_per_labelset{labelset="{comp2=\"compValue2\"}",user="1"} 2 + `), "cortex_ingester_active_series_per_labelset")) + + // Should bootstrap and apply limits when configuration change + limits.MaxSeriesPerLabelSet = append(limits.MaxSeriesPerLabelSet, + validation.MaxSeriesPerLabelSet{LabelSet: map[string]string{ + labels.MetricName: "metric_name", + "comp2": "compValue2", + }, + Limit: 3, // we already have 2 so we need to allow 1 more + }, + ) + + b, err = json.Marshal(limits) + require.NoError(t, err) + require.NoError(t, limits.UnmarshalJSON(b)) + tenantLimits.setLimits(userID, &limits) + + lbls = []string{labels.MetricName, "metric_name", "comp2", "compValue2"} + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate")...)}, samples, nil, nil, cortexpb.API)) + require.NoError(t, err) + + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API)) + httpResp, ok = httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "returned error is not an httpgrpc response") + assert.Equal(t, http.StatusBadRequest, int(httpResp.Code)) + require.ErrorContains(t, err, labels.FromStrings(lbls...).String()) + + ing.updateActiveSeries() + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. + # TYPE cortex_ingester_active_series_per_labelset gauge + cortex_ingester_active_series_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",user="1"} 3 + cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 + cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2 + cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",user="1"} 2 + cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\"}",user="1"} 7 + cortex_ingester_active_series_per_labelset{labelset="{comp2=\"compValue2\"}",user="1"} 3 + `), "cortex_ingester_active_series_per_labelset")) + + // Should remove metrics when the limits is removed + limits.MaxSeriesPerLabelSet = limits.MaxSeriesPerLabelSet[:2] + b, err = json.Marshal(limits) + require.NoError(t, err) + require.NoError(t, limits.UnmarshalJSON(b)) + tenantLimits.setLimits(userID, &limits) + ing.updateActiveSeries() + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. + # TYPE cortex_ingester_active_series_per_labelset gauge + cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 + cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2 + `), "cortex_ingester_active_series_per_labelset")) + + // Should persist between restarts + services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + registry = prometheus.NewRegistry() + ing, err = prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + ing.updateActiveSeries() + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. + # TYPE cortex_ingester_active_series_per_labelset gauge + cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 + cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2 + `), "cortex_ingester_active_series_per_labelset")) + services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + +} + func TestIngesterUserLimitExceeded(t *testing.T) { limits := defaultLimitsTestConfig() limits.MaxLocalSeriesPerUser = 1 @@ -89,7 +304,7 @@ func TestIngesterUserLimitExceeded(t *testing.T) { require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) blocksIngesterGenerator := func() *Ingester { - ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, blocksDir, nil) + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) // Wait until it's ACTIVE @@ -211,7 +426,7 @@ func TestIngesterMetricLimitExceeded(t *testing.T) { require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) blocksIngesterGenerator := func() *Ingester { - ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, blocksDir, nil) + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) // Wait until it's ACTIVE @@ -931,7 +1146,7 @@ func TestIngester_Push(t *testing.T) { limits := defaultLimitsTestConfig() limits.MaxExemplars = testData.maxExemplars limits.OutOfOrderTimeWindow = model.Duration(testData.oooTimeWindow) - i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, "", registry) + i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -1478,7 +1693,7 @@ func Benchmark_Ingester_PushOnError(b *testing.B) { return instanceLimits } - ingester, err := prepareIngesterWithBlocksStorageAndLimits(b, cfg, limits, "", registry) + ingester, err := prepareIngesterWithBlocksStorageAndLimits(b, cfg, limits, nil, "", registry) require.NoError(b, err) require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingester)) defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck @@ -2464,10 +2679,10 @@ func mockWriteRequest(t *testing.T, lbls labels.Labels, value float64, timestamp } func prepareIngesterWithBlocksStorage(t testing.TB, ingesterCfg Config, registerer prometheus.Registerer) (*Ingester, error) { - return prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, defaultLimitsTestConfig(), "", registerer) + return prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, defaultLimitsTestConfig(), nil, "", registerer) } -func prepareIngesterWithBlocksStorageAndLimits(t testing.TB, ingesterCfg Config, limits validation.Limits, dataDir string, registerer prometheus.Registerer) (*Ingester, error) { +func prepareIngesterWithBlocksStorageAndLimits(t testing.TB, ingesterCfg Config, limits validation.Limits, tenantLimits validation.TenantLimits, dataDir string, registerer prometheus.Registerer) (*Ingester, error) { // Create a data dir if none has been provided. if dataDir == "" { dataDir = t.TempDir() @@ -2475,7 +2690,7 @@ func prepareIngesterWithBlocksStorageAndLimits(t testing.TB, ingesterCfg Config, bucketDir := t.TempDir() - overrides, err := validation.NewOverrides(limits, nil) + overrides, err := validation.NewOverrides(limits, tenantLimits) if err != nil { return nil, err } @@ -4216,7 +4431,7 @@ func TestIngester_MaxExemplarsFallBack(t *testing.T) { dir := t.TempDir() blocksDir := filepath.Join(dir, "blocks") limits := defaultLimitsTestConfig() - i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, blocksDir, nil) + i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, blocksDir, nil) require.NoError(t, err) maxExemplars := i.getMaxExemplars("someTenant") @@ -4224,7 +4439,7 @@ func TestIngester_MaxExemplarsFallBack(t *testing.T) { // set max exemplars value in limits, and re-initialize the ingester limits.MaxExemplars = 5 - i, err = prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, blocksDir, nil) + i, err = prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, blocksDir, nil) require.NoError(t, err) // validate this value is picked up now @@ -4246,3 +4461,35 @@ func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest return cortexpb.ToWriteRequest(lbls, samples, nil, nil, cortexpb.API) } + +// mockTenantLimits exposes per-tenant limits based on a provided map +type mockTenantLimits struct { + limits map[string]*validation.Limits + m sync.Mutex +} + +// newMockTenantLimits creates a new mockTenantLimits that returns per-tenant limits based on +// the given map +func newMockTenantLimits(limits map[string]*validation.Limits) *mockTenantLimits { + return &mockTenantLimits{ + limits: limits, + } +} + +func (l *mockTenantLimits) ByUserID(userID string) *validation.Limits { + l.m.Lock() + defer l.m.Unlock() + return l.limits[userID] +} + +func (l *mockTenantLimits) AllByUserID() map[string]*validation.Limits { + l.m.Lock() + defer l.m.Unlock() + return l.limits +} + +func (l *mockTenantLimits) setLimits(userID string, limits *validation.Limits) { + l.m.Lock() + defer l.m.Unlock() + l.limits[userID] = limits +} diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index f0d8d0ace40..daae2bd0cbb 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -2,6 +2,7 @@ package ingester import ( "fmt" + "github.com/prometheus/prometheus/model/labels" "math" "github.com/pkg/errors" @@ -17,6 +18,13 @@ var ( errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded") ) +type errMaxSeriesPerLabelSetLimitExceeded struct { + error + id string + localLimit int + globalLimit int +} + // RingCount is the interface exposed by a ring implementation which allows // to count members type RingCount interface { @@ -97,18 +105,42 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int return errMaxMetadataPerUserLimitExceeded } +func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.MaxSeriesPerLabelSet) (int, error)) error { + m := l.maxSeriesPerLabelSet(userID, metric) + for _, limit := range m { + maxFunc := func(string) int { + return limit.Limit + } + local := l.maxByLocalAndGlobal(userID, maxFunc, maxFunc) + if u, err := f(limit); err != nil { + return err + } else if u >= local { + return errMaxSeriesPerLabelSetLimitExceeded{ + id: limit.Id, + localLimit: local, + globalLimit: limit.Limit, + } + } + } + return nil +} + // FormatError returns the input error enriched with the actual limits for the given user. // It acts as pass-through if the input error is unknown. func (l *Limiter) FormatError(userID string, err error) error { - switch err { - case errMaxSeriesPerUserLimitExceeded: + switch { + case errors.Is(err, errMaxSeriesPerUserLimitExceeded): return l.formatMaxSeriesPerUserError(userID) - case errMaxSeriesPerMetricLimitExceeded: + case errors.Is(err, errMaxSeriesPerMetricLimitExceeded): return l.formatMaxSeriesPerMetricError(userID) - case errMaxMetadataPerUserLimitExceeded: + case errors.Is(err, errMaxMetadataPerUserLimitExceeded): return l.formatMaxMetadataPerUserError(userID) - case errMaxMetadataPerMetricLimitExceeded: + case errors.Is(err, errMaxMetadataPerMetricLimitExceeded): return l.formatMaxMetadataPerMetricError(userID) + case errors.As(err, &errMaxSeriesPerLabelSetLimitExceeded{}): + e := errMaxSeriesPerLabelSetLimitExceeded{} + errors.As(err, &e) + return l.formatMaxSeriesPerLabelSetError(e) default: return err } @@ -150,6 +182,27 @@ func (l *Limiter) formatMaxMetadataPerMetricError(userID string) error { minNonZero(localLimit, globalLimit), l.AdminLimitMessage, localLimit, globalLimit, actualLimit) } +func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLimitExceeded) error { + return fmt.Errorf("per-labelset series limit of %d exceeded (labelSet: %s, local limit: %d global limit: %d actual)", + minNonZero(err.globalLimit, err.localLimit), err.id, err.localLimit, err.globalLimit) +} + +func (l *Limiter) maxSeriesPerLabelSet(userID string, metric labels.Labels) []validation.MaxSeriesPerLabelSet { + m := l.limits.MaxSeriesPerLabelSet(userID) + r := make([]validation.MaxSeriesPerLabelSet, 0, len(m)) +outer: + for _, lbls := range m { + for name, value := range lbls.LabelSet { + // We did not find some of the labels on the set + if v := metric.Get(name); v != value { + continue outer + } + } + r = append(r, lbls) + } + return r +} + func (l *Limiter) maxSeriesPerMetric(userID string) int { localLimit := l.limits.MaxLocalSeriesPerMetric(userID) globalLimit := l.limits.MaxGlobalSeriesPerMetric(userID) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index ac06433bd36..39f8184c1af 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -37,7 +37,8 @@ type ingesterMetrics struct { memSeriesRemovedTotal *prometheus.CounterVec memMetadataRemovedTotal *prometheus.CounterVec - activeSeriesPerUser *prometheus.GaugeVec + activeSeriesPerUser *prometheus.GaugeVec + activeSeriesPerLabelSet *prometheus.GaugeVec // Global limit metrics maxUsersGauge prometheus.GaugeFunc @@ -211,6 +212,11 @@ func newIngesterMetrics(r prometheus.Registerer, return 0 }), + activeSeriesPerLabelSet: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_series_per_labelset", + Help: "Number of currently active series per user and labelset.", + }, []string{"user", "labelset"}), + // Not registered automatically, but only if activeSeriesEnabled is true. activeSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_ingester_active_series", diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 57973e7568e..119a7c9d6b9 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -1,6 +1,11 @@ package ingester import ( + "context" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/index" "sync" "github.com/prometheus/common/model" @@ -11,8 +16,9 @@ import ( // DiscardedSamples metric labels const ( - perUserSeriesLimit = "per_user_series_limit" - perMetricSeriesLimit = "per_metric_series_limit" + perUserSeriesLimit = "per_user_series_limit" + perMetricSeriesLimit = "per_metric_series_limit" + perLabelsetSeriesLimit = "per_labelset_series_limit" ) const numMetricCounterShards = 128 @@ -78,3 +84,134 @@ func (m *metricCounter) increaseSeriesForMetric(metric string) { shard.m[metric]++ shard.mtx.Unlock() } + +type labelSetCounterEntry struct { + count int + labels labels.Labels +} + +type labelSetCounterShard struct { + *sync.RWMutex + valuesCounter map[uint64]*labelSetCounterEntry +} + +type labelSetCounter struct { + limiter *Limiter + shards []*labelSetCounterShard +} + +func newLabelSetCounter(limiter *Limiter) *labelSetCounter { + shards := make([]*labelSetCounterShard, 0, numMetricCounterShards) + for i := 0; i < numMetricCounterShards; i++ { + shards = append(shards, &labelSetCounterShard{ + RWMutex: &sync.RWMutex{}, + valuesCounter: map[uint64]*labelSetCounterEntry{}, + }) + } + return &labelSetCounter{ + shards: shards, + limiter: limiter, + } +} + +func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error { + return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.MaxSeriesPerLabelSet) (int, error) { + s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards] + s.RLock() + if r, ok := s.valuesCounter[set.Hash]; ok { + s.RUnlock() + return r.count, nil + } + s.RUnlock() + + // We still dont keep track of this label value so we need to backfill + ir, err := u.db.Head().Index() + defer ir.Close() + if err != nil { + return 0, err + } + + s.Lock() + defer s.Unlock() + if r, ok := s.valuesCounter[set.Hash]; !ok { + postings := make([]index.Postings, 0, len(set.LabelSet)) + for k, v := range set.LabelSet { + p, err := ir.Postings(ctx, k, v) + if err != nil { + return 0, err + } + postings = append(postings, p) + } + + p := index.Intersect(postings...) + + totalCount := 0 + for p.Next() { + totalCount++ + } + + if p.Err() != nil { + return 0, p.Err() + } + + s.valuesCounter[set.Hash] = &labelSetCounterEntry{ + count: totalCount, + labels: labels.FromMap(set.LabelSet), + } + return totalCount, nil + } else { + return r.count, nil + } + }) +} + +func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) { + limits := m.limiter.maxSeriesPerLabelSet(u.userID, metric) + for _, l := range limits { + s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards] + s.Lock() + if e, ok := s.valuesCounter[l.Hash]; ok { + e.count++ + } else { + s.valuesCounter[l.Hash] = &labelSetCounterEntry{ + count: 1, + labels: labels.FromMap(l.LabelSet), + } + } + s.Unlock() + } +} + +func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labels) { + limits := m.limiter.maxSeriesPerLabelSet(u.userID, metric) + for _, l := range limits { + s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards] + s.Lock() + if e, ok := s.valuesCounter[l.Hash]; ok { + e.count-- + } + s.Unlock() + } +} + +func (m *labelSetCounter) UpdateMetric(u *userTSDB, vec *prometheus.GaugeVec) { + currentLbsLimitHash := map[uint64]struct{}{} + for _, l := range m.limiter.limits.MaxSeriesPerLabelSet(u.userID) { + currentLbsLimitHash[l.Hash] = struct{}{} + } + + for i := 0; i < numMetricCounterShards; i++ { + s := m.shards[i] + s.RLock() + for h, entry := range s.valuesCounter { + // This limit no longer ecists + if _, ok := currentLbsLimitHash[h]; !ok { + vec.DeleteLabelValues(u.userID, entry.labels.String()) + continue + } + + vec.WithLabelValues(u.userID, entry.labels.String()).Set(float64(entry.count)) + } + s.RUnlock() + } +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 7c27f075674..cb729b54a72 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -5,6 +5,8 @@ import ( "encoding/json" "errors" "flag" + "github.com/prometheus/prometheus/model/labels" + "github.com/segmentio/fasthash/fnv1a" "math" "regexp" "strings" @@ -74,6 +76,13 @@ type TimeWindow struct { End model.Duration `yaml:"end" json:"end" doc:"nocli|description=End of the data select time window (including range selectors, modifiers and lookback delta) that the query should be within. If set to 0, it won't be checked.|default=0"` } +type MaxSeriesPerLabelSet struct { + Limit int `yaml:"limit" json:"limit" doc:"nocli"` + LabelSet map[string]string `yaml:"label_set" json:"label_set" doc:"nocli"` + Id string + Hash uint64 +} + // Limits describe all the limits for users; can be used to describe global default // limits via flags, or per-user limits via yaml config. type Limits struct { @@ -102,10 +111,11 @@ type Limits struct { // Ingester enforced limits. // Series - MaxLocalSeriesPerUser int `yaml:"max_series_per_user" json:"max_series_per_user"` - MaxLocalSeriesPerMetric int `yaml:"max_series_per_metric" json:"max_series_per_metric"` - MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"` - MaxGlobalSeriesPerMetric int `yaml:"max_global_series_per_metric" json:"max_global_series_per_metric"` + MaxLocalSeriesPerUser int `yaml:"max_series_per_user" json:"max_series_per_user"` + MaxLocalSeriesPerMetric int `yaml:"max_series_per_metric" json:"max_series_per_metric"` + MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"` + MaxGlobalSeriesPerMetric int `yaml:"max_global_series_per_metric" json:"max_global_series_per_metric"` + MaxSeriesPerLabelSet []MaxSeriesPerLabelSet `yaml:"max_series_per_label_set" json:"max_series_per_label_set" doc:"hidden"` // Metadata MaxLocalMetricsWithMetadataPerUser int `yaml:"max_metadata_per_user" json:"max_metadata_per_user"` @@ -285,6 +295,8 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { return err } + l.calculateMaxSeriesPerLabelSetId() + return nil } @@ -311,9 +323,19 @@ func (l *Limits) UnmarshalJSON(data []byte) error { return err } + l.calculateMaxSeriesPerLabelSetId() + return nil } +func (l *Limits) calculateMaxSeriesPerLabelSetId() { + for k, limit := range l.MaxSeriesPerLabelSet { + limit.Id = labels.FromMap(limit.LabelSet).String() + limit.Hash = fnv1a.HashBytes64([]byte(limit.Id)) + l.MaxSeriesPerLabelSet[k] = limit + } +} + func (l *Limits) copyNotificationIntegrationLimits(defaults NotificationRateLimitMap) { l.NotificationRateLimitPerIntegration = make(map[string]float64, len(defaults)) for k, v := range defaults { @@ -519,6 +541,11 @@ func (o *Overrides) MaxGlobalSeriesPerMetric(userID string) int { return o.GetOverridesForUser(userID).MaxGlobalSeriesPerMetric } +// MaxSeriesPerLabelSet returns the maximum number of series allowed per labelset across the cluster. +func (o *Overrides) MaxSeriesPerLabelSet(userID string) []MaxSeriesPerLabelSet { + return o.GetOverridesForUser(userID).MaxSeriesPerLabelSet +} + // MaxChunksPerQueryFromStore returns the maximum number of chunks allowed per query when fetching // chunks from the long-term storage. func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int {