Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Create a new "Per LabelSet" limit #5950

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3168,6 +3168,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-global-series-per-metric
[max_global_series_per_metric: <int> | default = 0]

# [Experimental] The maximum number of active series per LabelSet, across the
# cluster before replication. Empty list to disable.
[max_series_per_label_set: <list of MaxSeriesPerLabelSet> | default = []]

# The maximum number of active metrics with metadata per user, per ingester. 0
# to disable.
# CLI flag: -ingester.max-metadata-per-user
Expand Down Expand Up @@ -4009,7 +4013,7 @@ The `ruler_config` configures the Cortex ruler.
[external_url: <url> | default = ]

# Labels to add to all alerts.
[external_labels: <list of Label> | default = []]
[external_labels: <map of string (labelName) to string (labelValue)> | default = []]

ruler_client:
# gRPC client max receive message size (bytes).
Expand Down Expand Up @@ -5306,6 +5310,16 @@ otel:
[tls_insecure_skip_verify: <boolean> | default = false]
```

### `MaxSeriesPerLabelSet`

```yaml
# The maximum number of active series per LabelSet before replication.
[limit: <int> | default = ]

# LabelSet which the limit should be applied.
[label_set: <map of string (labelName) to string (labelValue)> | default = []]
```

### `PriorityDef`

```yaml
Expand Down Expand Up @@ -5350,11 +5364,3 @@ time_window:
# name of the rule group
[name: <string> | default = ""]
```

### `Label`

```yaml
[name: <string> | default = ""]

[value: <string> | default = ""]
```
47 changes: 35 additions & 12 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,12 @@ func (r tsdbCloseCheckResult) shouldClose() bool {
}

type userTSDB struct {
db *tsdb.DB
userID string
activeSeries *ActiveSeries
seriesInMetric *metricCounter
limiter *Limiter
db *tsdb.DB
userID string
activeSeries *ActiveSeries
seriesInMetric *metricCounter
labelSetCounter *labelSetCounter
limiter *Limiter

instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester.
instanceLimitsFn func() *InstanceLimits
Expand Down Expand Up @@ -399,6 +400,10 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
return err
}

if err := u.labelSetCounter.canAddSeriesForLabelSet(context.TODO(), u, metric); err != nil {
return err
}

return nil
}

Expand All @@ -412,6 +417,7 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {
return
}
u.seriesInMetric.increaseSeriesForMetric(metricName)
u.labelSetCounter.increaseSeriesLabelSet(u, metric)
}

// PostDeletion implements SeriesLifecycleCallback interface.
Expand All @@ -425,6 +431,7 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels)
continue
}
u.seriesInMetric.decreaseSeriesForMetric(metricName)
u.labelSetCounter.decreaseSeriesLabelSet(u, metric)
}
}

Expand Down Expand Up @@ -713,6 +720,15 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
}
i.limiter = NewLimiter(
limits,
i.lifecycler,
cfg.DistributorShardingStrategy,
cfg.DistributorShardByAllLabels,
cfg.LifecyclerConfig.RingConfig.ReplicationFactor,
cfg.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled,
cfg.AdminLimitMessage,
)
i.metrics = newIngesterMetrics(registerer,
false,
false,
Expand Down Expand Up @@ -924,6 +940,7 @@ func (i *Ingester) updateActiveSeries() {

userDB.activeSeries.Purge(purgeTime)
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active()))
userDB.labelSetCounter.UpdateMetric(userDB, i.metrics.activeSeriesPerLabelSet)
}
}

Expand Down Expand Up @@ -1100,38 +1117,43 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// of it, so that we can return it back to the distributor, which will return a
// 400 error to the client. The client (Prometheus) will not retry on 400, and
// we actually ingested all samples which haven't failed.
switch cause := errors.Cause(err); cause {
case storage.ErrOutOfBounds:
switch cause := errors.Cause(err); {
case errors.Is(cause, storage.ErrOutOfBounds):
sampleOutOfBoundsCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrOutOfOrderSample:
case errors.Is(cause, storage.ErrOutOfOrderSample):
sampleOutOfOrderCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrDuplicateSampleForTimestamp:
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
newValueForTimestampCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrTooOldSample:
case errors.Is(cause, storage.ErrTooOldSample):
sampleTooOldCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case errMaxSeriesPerUserLimitExceeded:
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
perUserSeriesLimitCount++
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
continue

case errMaxSeriesPerMetricLimitExceeded:
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
continue
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
updateFirstPartial(func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont we want to create a new discarded samples reason label here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed.

I will create another PR with this and the chngelog in a bit.

return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
continue
}

// The error looks an issue on our side, so we should rollback
Expand Down Expand Up @@ -2018,6 +2040,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
userID: userID,
activeSeries: NewActiveSeries(),
seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()),
labelSetCounter: newLabelSetCounter(i.limiter),
ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),

Expand Down
Loading
Loading