Skip to content

Commit

Permalink
Ingester bounds (#3992)
Browse files Browse the repository at this point in the history
* Added global ingester limits.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Add tests for global limits.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Expose current limits used by ingester via metrics.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Add max inflight requests limit.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added test for inflight push requests.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Docs.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Debug log.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Test for unmarshalling.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Nil default global limits.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* CHANGELOG.md

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Expose current ingestion rate as gauge.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Expose number of inflight requests.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Change ewmaRate to use RWMutex.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Rename globalLimits to instanceLimits.
Rename max_users to max_tenants.
Removed extra parameter to `getOrCreateTSDB`

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Rename globalLimits to instanceLimits, fix users -> tenants, explain that these limits only work when using blocks engine.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Rename globalLimits to instanceLimits, fix users -> tenants, explain that these limits only work when using blocks engine.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Remove details from error messages.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Comment.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix series count when closing non-empty TSDB.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added new failure modes to benchmark.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fixed docs.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Tick every second.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix CHANGELOG.md

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Review feedback.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Review feedback.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Remove forgotten fmt.Println.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Use error variables.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany authored Apr 9, 2021
1 parent f107e5d commit 7f85a26
Show file tree
Hide file tree
Showing 11 changed files with 786 additions and 86 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* [ENHANCEMENT] Allow use of `y|w|d` suffixes for duration related limits and per-tenant limits. #4044
* [ENHANCEMENT] Query-frontend: Small optimization on top of PR #3968 to avoid unnecessary Extents merging. #4026
* [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
Expand Down
25 changes: 25 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,31 @@ lifecycler:
# After what time a series is considered to be inactive.
# CLI flag: -ingester.active-series-metrics-idle-timeout
[active_series_metrics_idle_timeout: <duration> | default = 10m]
instance_limits:
# Max ingestion rate (samples/sec) that ingester will accept. This limit is
# per-ingester, not per-tenant. Additional push requests will be rejected.
# Current ingestion rate is computed as exponentially weighted moving average,
# updated every second. This limit only works when using blocks engine. 0 =
# unlimited.
# CLI flag: -ingester.instance-limits.max-ingestion-rate
[max_ingestion_rate: <float> | default = 0]
# Max users that this ingester can hold. Requests from additional users will
# be rejected. This limit only works when using blocks engine. 0 = unlimited.
# CLI flag: -ingester.instance-limits.max-tenants
[max_tenants: <int> | default = 0]
# Max series that this ingester can hold (across all tenants). Requests to
# create additional series will be rejected. This limit only works when using
# blocks engine. 0 = unlimited.
# CLI flag: -ingester.instance-limits.max-series
[max_series: <int> | default = 0]
# Max inflight push requests that this ingester can handle (across all
# tenants). Additional requests will be rejected. 0 = unlimited.
# CLI flag: -ingester.instance-limits.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]
```

### `querier_config`
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig)
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
t.tsdbIngesterConfig()

t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Overrides, t.Store, prometheus.DefaultRegisterer, util_log.Logger)
Expand Down
16 changes: 16 additions & 0 deletions pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type runtimeConfigValues struct {
Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"`

IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"`

IngesterLimits ingester.InstanceLimits `yaml:"ingester_limits"`
}

// runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager
Expand Down Expand Up @@ -124,6 +126,20 @@ func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.Quer
}
}

func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.InstanceLimits {
if manager == nil {
return nil
}

return func() *ingester.InstanceLimits {
val := manager.GetConfig()
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
return &cfg.IngesterLimits
}
return nil
}
}

func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues)
Expand Down
30 changes: 28 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -93,6 +94,9 @@ type Config struct {
DistributorShardingStrategy string `yaml:"-"`
DistributorShardByAllLabels bool `yaml:"-"`

DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(addr string, cfg client.Config) (client.HealthAndIngesterClient, error)
}
Expand Down Expand Up @@ -121,6 +125,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.")
f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.")

f.Float64Var(&cfg.DefaultLimits.MaxIngestionRate, "ingester.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that ingester will accept. This limit is per-ingester, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInMemorySeries, "ingester.instance-limits.max-series", 0, "Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInflightPushRequests, "ingester.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.")
}

// Ingester deals with "in flight" chunks. Based on Prometheus 1.x
Expand Down Expand Up @@ -167,6 +176,10 @@ type Ingester struct {

// Prometheus block storage
TSDBState TSDBState

// Rate of pushed samples. Only used by V2-ingester to limit global samples push rate.
ingestionRate *ewmaRate
inflightPushRequests atomic.Int64
}

// ChunkStore is the interface we need to store chunks
Expand All @@ -176,6 +189,8 @@ type ChunkStore interface {

// New constructs a new Ingester.
func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
defaultInstanceLimits = &cfg.DefaultLimits

if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.MakeIngesterClient
}
Expand Down Expand Up @@ -209,7 +224,6 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
metrics: newIngesterMetrics(registerer, true, cfg.ActiveSeriesMetricsEnabled),

limits: limits,
chunkStore: chunkStore,
Expand All @@ -219,6 +233,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
registerer: registerer,
logger: logger,
}
i.metrics = newIngesterMetrics(registerer, true, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, nil, &i.inflightPushRequests)

var err error
// During WAL recovery, it will create new user states which requires the limiter.
Expand Down Expand Up @@ -301,14 +316,14 @@ func NewForFlusher(cfg Config, chunkStore ChunkStore, limits *validation.Overrid

i := &Ingester{
cfg: cfg,
metrics: newIngesterMetrics(registerer, true, false),
chunkStore: chunkStore,
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
flushRateLimiter: rate.NewLimiter(rate.Inf, 1),
wal: &noopWAL{},
limits: limits,
logger: logger,
}
i.metrics = newIngesterMetrics(registerer, true, false, i.getInstanceLimits, nil, &i.inflightPushRequests)

i.BasicService = services.NewBasicService(i.startingForFlusher, i.loopForFlusher, i.stopping)
return i, nil
Expand Down Expand Up @@ -444,6 +459,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
return nil, err
}

// We will report *this* request in the error too.
inflight := i.inflightPushRequests.Inc()
defer i.inflightPushRequests.Dec()

gl := i.getInstanceLimits()
if gl != nil && gl.MaxInflightPushRequests > 0 {
if inflight > gl.MaxInflightPushRequests {
return nil, errTooManyInflightPushRequests
}
}

if i.cfg.BlocksStorageEnabled {
return i.v2Push(ctx, req)
}
Expand Down
70 changes: 68 additions & 2 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type userTSDB struct {
seriesInMetric *metricCounter
limiter *Limiter

instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester.
instanceLimitsFn func() *InstanceLimits

stateMtx sync.RWMutex
state tsdbState
pushesInFlight sync.WaitGroup // Increased with stateMtx read lock held, only if state == active or activeShipping.
Expand Down Expand Up @@ -214,6 +217,14 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
return nil
}

// Verify ingester's global limit
gl := u.instanceLimitsFn()
if gl != nil && gl.MaxInMemorySeries > 0 {
if series := u.instanceSeriesCount.Load(); series >= gl.MaxInMemorySeries {
return errMaxSeriesLimitReached
}
}

// Total series limit.
if err := u.limiter.AssertMaxSeriesPerUser(u.userID, int(u.Head().NumSeries())); err != nil {
return err
Expand All @@ -233,6 +244,8 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {

// PostCreation implements SeriesLifecycleCallback interface.
func (u *userTSDB) PostCreation(metric labels.Labels) {
u.instanceSeriesCount.Inc()

metricName, err := extract.MetricNameFromLabels(metric)
if err != nil {
// This should never happen because it has already been checked in PreCreation().
Expand All @@ -243,6 +256,8 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {

// PostDeletion implements SeriesLifecycleCallback interface.
func (u *userTSDB) PostDeletion(metrics ...labels.Labels) {
u.instanceSeriesCount.Sub(int64(len(metrics)))

for _, metric := range metrics {
metricName, err := extract.MetricNameFromLabels(metric)
if err != nil {
Expand Down Expand Up @@ -377,6 +392,9 @@ type TSDBState struct {
// Timeout chosen for idle compactions.
compactionIdleTimeout time.Duration

// Number of series in memory, across all tenants.
seriesCount atomic.Int64

// Head compactions metrics.
compactionsTriggered prometheus.Counter
compactionsFailed prometheus.Counter
Expand Down Expand Up @@ -449,14 +467,15 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
metrics: newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled),
limits: limits,
chunkStore: nil,
usersMetadata: map[string]*userMetricsMetadata{},
wal: &noopWAL{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
ingestionRate: newEWMARate(0.2, cfg.RateUpdatePeriod),
}
i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests)

// Replace specific metrics which we can't directly track but we need to read
// them from the underlying system (ie. TSDB).
Expand Down Expand Up @@ -511,11 +530,11 @@ func NewV2ForFlusher(cfg Config, limits *validation.Overrides, registerer promet
i := &Ingester{
cfg: cfg,
limits: limits,
metrics: newIngesterMetrics(registerer, false, false),
wal: &noopWAL{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
}
i.metrics = newIngesterMetrics(registerer, false, false, i.getInstanceLimits, nil, &i.inflightPushRequests)

i.TSDBState.shipperIngesterID = "flusher"

Expand Down Expand Up @@ -613,6 +632,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
rateUpdateTicker := time.NewTicker(i.cfg.RateUpdatePeriod)
defer rateUpdateTicker.Stop()

ingestionRateTicker := time.NewTicker(1 * time.Second)
defer ingestionRateTicker.Stop()

var activeSeriesTickerChan <-chan time.Time
if i.cfg.ActiveSeriesMetricsEnabled {
t := time.NewTicker(i.cfg.ActiveSeriesMetricsUpdatePeriod)
Expand All @@ -628,6 +650,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
select {
case <-metadataPurgeTicker.C:
i.purgeUserMetricsMetadata()
case <-ingestionRateTicker.C:
i.ingestionRate.tick()
case <-rateUpdateTicker.C:
i.userStatesMtx.RLock()
for _, db := range i.TSDBState.dbs {
Expand Down Expand Up @@ -680,6 +704,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
return nil, err
}

il := i.getInstanceLimits()
if il != nil && il.MaxIngestionRate > 0 {
if rate := i.ingestionRate.rate(); rate >= il.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
}
}

db, err := i.getOrCreateTSDB(userID, false)
if err != nil {
return nil, wrapWithUser(err, userID)
Expand Down Expand Up @@ -841,6 +872,8 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
validation.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
}

i.ingestionRate.add(int64(succeededSamplesCount))

switch req.Source {
case cortexpb.RULE:
db.ingestedRuleSamples.add(int64(succeededSamplesCount))
Expand Down Expand Up @@ -1381,6 +1414,13 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState)
}

gl := i.getInstanceLimits()
if gl != nil && gl.MaxInMemoryTenants > 0 {
if users := int64(len(i.TSDBState.dbs)); users >= gl.MaxInMemoryTenants {
return nil, errMaxUsersLimitReached
}
}

// Create the database and a shipper for a user
db, err := i.createTSDB(userID)
if err != nil {
Expand Down Expand Up @@ -1408,6 +1448,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
seriesInMetric: newMetricCounter(i.limiter),
ingestedAPISamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),

instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.TSDBState.seriesCount,
}

// Create a new user database
Expand Down Expand Up @@ -1876,6 +1919,11 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
tenantDeleted = true
}

// At this point there are no more pushes to TSDB, and no possible compaction. Normally TSDB is empty,
// but if we're closing TSDB because of tenant deletion mark, then it may still contain some series.
// We need to remove these series from series count.
i.TSDBState.seriesCount.Sub(int64(userDB.Head().NumSeries()))

dir := userDB.db.Dir()

if err := userDB.Close(); err != nil {
Expand Down Expand Up @@ -2031,3 +2079,21 @@ func wrappedTSDBIngestErr(ingestErr error, timestamp model.Time, labels []cortex

return fmt.Errorf(errTSDBIngest, ingestErr, timestamp.Time().UTC().Format(time.RFC3339Nano), cortexpb.FromLabelAdaptersToLabels(labels).String())
}

func (i *Ingester) getInstanceLimits() *InstanceLimits {
// Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL.
if i.State() == services.Starting {
return nil
}

if i.cfg.InstanceLimitsFn == nil {
return defaultInstanceLimits
}

l := i.cfg.InstanceLimitsFn()
if l == nil {
return defaultInstanceLimits
}

return l
}
Loading

0 comments on commit 7f85a26

Please sign in to comment.