Skip to content

Commit

Permalink
Creating Limits per LabelSet
Browse files Browse the repository at this point in the history
  • Loading branch information
alanprot committed May 14, 2024
1 parent fe2047e commit 0ae15ec
Show file tree
Hide file tree
Showing 6 changed files with 517 additions and 33 deletions.
38 changes: 26 additions & 12 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,12 @@ func (r tsdbCloseCheckResult) shouldClose() bool {
}

type userTSDB struct {
db *tsdb.DB
userID string
activeSeries *ActiveSeries
seriesInMetric *metricCounter
limiter *Limiter
db *tsdb.DB
userID string
activeSeries *ActiveSeries
seriesInMetric *metricCounter
labelSetCounter *labelSetCounter
limiter *Limiter

instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester.
instanceLimitsFn func() *InstanceLimits
Expand Down Expand Up @@ -399,6 +400,10 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
return err
}

if err := u.labelSetCounter.canAddSeriesForLabelSet(context.TODO(), u, metric); err != nil {
return err
}

return nil
}

Expand All @@ -412,6 +417,7 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {
return
}
u.seriesInMetric.increaseSeriesForMetric(metricName)
u.labelSetCounter.increaseSeriesLabelSet(u, metric)
}

// PostDeletion implements SeriesLifecycleCallback interface.
Expand All @@ -425,6 +431,7 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels)
continue
}
u.seriesInMetric.decreaseSeriesForMetric(metricName)
u.labelSetCounter.decreaseSeriesLabelSet(u, metric)
}
}

Expand Down Expand Up @@ -924,6 +931,7 @@ func (i *Ingester) updateActiveSeries() {

userDB.activeSeries.Purge(purgeTime)
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active()))
userDB.labelSetCounter.UpdateMetric(userDB, i.metrics.activeSeriesPerLabelSet)
}
}

Expand Down Expand Up @@ -1100,38 +1108,43 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// of it, so that we can return it back to the distributor, which will return a
// 400 error to the client. The client (Prometheus) will not retry on 400, and
// we actually ingested all samples which haven't failed.
switch cause := errors.Cause(err); cause {
case storage.ErrOutOfBounds:
switch cause := errors.Cause(err); {
case errors.Is(cause, storage.ErrOutOfBounds):
sampleOutOfBoundsCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrOutOfOrderSample:
case errors.Is(cause, storage.ErrOutOfOrderSample):
sampleOutOfOrderCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrDuplicateSampleForTimestamp:
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
newValueForTimestampCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrTooOldSample:
case errors.Is(cause, storage.ErrTooOldSample):
sampleTooOldCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case errMaxSeriesPerUserLimitExceeded:
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
perUserSeriesLimitCount++
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
continue

case errMaxSeriesPerMetricLimitExceeded:
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
continue
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
updateFirstPartial(func() error {
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
continue
}

// The error looks an issue on our side, so we should rollback
Expand Down Expand Up @@ -2018,6 +2031,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
userID: userID,
activeSeries: NewActiveSeries(),
seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()),
labelSetCounter: newLabelSetCounter(i.limiter),
ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),

Expand Down
Loading

0 comments on commit 0ae15ec

Please sign in to comment.