Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester bounds #3992

Merged
merged 27 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
82d0a51
Added global ingester limits.
pstibrany Mar 22, 2021
edd0587
Add tests for global limits.
pstibrany Mar 22, 2021
023a25e
Expose current limits used by ingester via metrics.
pstibrany Mar 22, 2021
baabc5f
Add max inflight requests limit.
pstibrany Mar 22, 2021
ffc8155
Added test for inflight push requests.
pstibrany Mar 22, 2021
635e218
Docs.
pstibrany Mar 22, 2021
90390ef
Debug log.
pstibrany Mar 22, 2021
e37a216
Test for unmarshalling.
pstibrany Mar 22, 2021
82b14c2
Nil default global limits.
pstibrany Mar 22, 2021
e97bef6
CHANGELOG.md
pstibrany Mar 22, 2021
97af696
Expose current ingestion rate as gauge.
pstibrany Mar 24, 2021
e870730
Expose number of inflight requests.
pstibrany Mar 24, 2021
2360350
Change ewmaRate to use RWMutex.
pstibrany Mar 30, 2021
0a68912
Rename globalLimits to instanceLimits.
pstibrany Mar 30, 2021
b453a1e
Rename globalLimits to instanceLimits, fix users -> tenants, explain …
pstibrany Mar 30, 2021
37be0e6
Rename globalLimits to instanceLimits, fix users -> tenants, explain …
pstibrany Mar 30, 2021
3be949c
Remove details from error messages.
pstibrany Mar 30, 2021
69a6a63
Comment.
pstibrany Mar 30, 2021
47b93ce
Fix series count when closing non-empty TSDB.
pstibrany Mar 30, 2021
202f1f4
Added new failure modes to benchmark.
pstibrany Mar 31, 2021
809e4c0
Fixed docs.
pstibrany Mar 31, 2021
34266ad
Tick every second.
pstibrany Mar 31, 2021
e20cd3b
Fix CHANGELOG.md
pstibrany Mar 31, 2021
b059e8e
Review feedback.
pstibrany Mar 31, 2021
ca04f26
Review feedback.
pstibrany Mar 31, 2021
6693661
Remove forgotten fmt.Println.
pstibrany Apr 9, 2021
2581afa
Use error variables.
pstibrany Apr 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* [ENHANCEMENT] Allow use of `y|w|d` suffixes for duration related limits and per-tenant limits. #4044
* [ENHANCEMENT] Query-frontend: Small optimization on top of PR #3968 to avoid unnecessary Extents merging. #4026
* [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
Expand Down
25 changes: 25 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,31 @@ lifecycler:
# After what time a series is considered to be inactive.
# CLI flag: -ingester.active-series-metrics-idle-timeout
[active_series_metrics_idle_timeout: <duration> | default = 10m]

instance_limits:
# Max ingestion rate (samples/sec) that ingester will accept. This limit is
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
# per-ingester, not per-tenant. Additional push requests will be rejected.
# Current ingestion rate is computed as exponentially weighted moving average,
# updated every second. This limit only works when using blocks engine. 0 =
# unlimited.
# CLI flag: -ingester.instance-limits.max-ingestion-rate
[max_ingestion_rate: <float> | default = 0]

# Max users that this ingester can hold. Requests from additional users will
# be rejected. This limit only works when using blocks engine. 0 = unlimited.
# CLI flag: -ingester.instance-limits.max-tenants
[max_tenants: <int> | default = 0]

# Max series that this ingester can hold (across all tenants). Requests to
# create additional series will be rejected. This limit only works when using
# blocks engine. 0 = unlimited.
# CLI flag: -ingester.instance-limits.max-series
[max_series: <int> | default = 0]

# Max inflight push requests that this ingester can handle (across all
# tenants). Additional requests will be rejected. 0 = unlimited.
# CLI flag: -ingester.instance-limits.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]
```

### `querier_config`
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig)
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
t.tsdbIngesterConfig()

t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Overrides, t.Store, prometheus.DefaultRegisterer, util_log.Logger)
Expand Down
16 changes: 16 additions & 0 deletions pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type runtimeConfigValues struct {
Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"`

IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"`

IngesterLimits ingester.InstanceLimits `yaml:"ingester_limits"`
}

// runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager
Expand Down Expand Up @@ -124,6 +126,20 @@ func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.Quer
}
}

func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.InstanceLimits {
if manager == nil {
return nil
}

return func() *ingester.InstanceLimits {
val := manager.GetConfig()
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
return &cfg.IngesterLimits
}
return nil
}
}

func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues)
Expand Down
30 changes: 28 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -93,6 +94,9 @@ type Config struct {
DistributorShardingStrategy string `yaml:"-"`
DistributorShardByAllLabels bool `yaml:"-"`

DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(addr string, cfg client.Config) (client.HealthAndIngesterClient, error)
}
Expand Down Expand Up @@ -121,6 +125,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.")
f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.")

f.Float64Var(&cfg.DefaultLimits.MaxIngestionRate, "ingester.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that ingester will accept. This limit is per-ingester, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInMemorySeries, "ingester.instance-limits.max-series", 0, "Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInflightPushRequests, "ingester.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.")
}

// Ingester deals with "in flight" chunks. Based on Prometheus 1.x
Expand Down Expand Up @@ -167,6 +176,10 @@ type Ingester struct {

// Prometheus block storage
TSDBState TSDBState

// Rate of pushed samples. Only used by V2-ingester to limit global samples push rate.
ingestionRate *ewmaRate
inflightPushRequests atomic.Int64
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}

// ChunkStore is the interface we need to store chunks
Expand All @@ -176,6 +189,8 @@ type ChunkStore interface {

// New constructs a new Ingester.
func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
defaultInstanceLimits = &cfg.DefaultLimits

if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.MakeIngesterClient
}
Expand Down Expand Up @@ -209,7 +224,6 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
metrics: newIngesterMetrics(registerer, true, cfg.ActiveSeriesMetricsEnabled),

limits: limits,
chunkStore: chunkStore,
Expand All @@ -219,6 +233,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
registerer: registerer,
logger: logger,
}
i.metrics = newIngesterMetrics(registerer, true, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, nil, &i.inflightPushRequests)

var err error
// During WAL recovery, it will create new user states which requires the limiter.
Expand Down Expand Up @@ -301,14 +316,14 @@ func NewForFlusher(cfg Config, chunkStore ChunkStore, limits *validation.Overrid

i := &Ingester{
cfg: cfg,
metrics: newIngesterMetrics(registerer, true, false),
chunkStore: chunkStore,
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
flushRateLimiter: rate.NewLimiter(rate.Inf, 1),
wal: &noopWAL{},
limits: limits,
logger: logger,
}
i.metrics = newIngesterMetrics(registerer, true, false, i.getInstanceLimits, nil, &i.inflightPushRequests)

i.BasicService = services.NewBasicService(i.startingForFlusher, i.loopForFlusher, i.stopping)
return i, nil
Expand Down Expand Up @@ -444,6 +459,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
return nil, err
}

// We will report *this* request in the error too.
inflight := i.inflightPushRequests.Inc()
defer i.inflightPushRequests.Dec()

gl := i.getInstanceLimits()
if gl != nil && gl.MaxInflightPushRequests > 0 {
if inflight > gl.MaxInflightPushRequests {
return nil, errTooManyInflightPushRequests
}
}

if i.cfg.BlocksStorageEnabled {
return i.v2Push(ctx, req)
}
Expand Down
70 changes: 68 additions & 2 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type userTSDB struct {
seriesInMetric *metricCounter
limiter *Limiter

instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester.
instanceLimitsFn func() *InstanceLimits

stateMtx sync.RWMutex
state tsdbState
pushesInFlight sync.WaitGroup // Increased with stateMtx read lock held, only if state == active or activeShipping.
Expand Down Expand Up @@ -214,6 +217,14 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
return nil
}

// Verify ingester's global limit
gl := u.instanceLimitsFn()
if gl != nil && gl.MaxInMemorySeries > 0 {
if series := u.instanceSeriesCount.Load(); series >= gl.MaxInMemorySeries {
return errMaxSeriesLimitReached
}
}

// Total series limit.
if err := u.limiter.AssertMaxSeriesPerUser(u.userID, int(u.Head().NumSeries())); err != nil {
return err
Expand All @@ -233,6 +244,8 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {

// PostCreation implements SeriesLifecycleCallback interface.
func (u *userTSDB) PostCreation(metric labels.Labels) {
u.instanceSeriesCount.Inc()

metricName, err := extract.MetricNameFromLabels(metric)
if err != nil {
// This should never happen because it has already been checked in PreCreation().
Expand All @@ -243,6 +256,8 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {

// PostDeletion implements SeriesLifecycleCallback interface.
func (u *userTSDB) PostDeletion(metrics ...labels.Labels) {
u.instanceSeriesCount.Sub(int64(len(metrics)))

for _, metric := range metrics {
metricName, err := extract.MetricNameFromLabels(metric)
if err != nil {
Expand Down Expand Up @@ -377,6 +392,9 @@ type TSDBState struct {
// Timeout chosen for idle compactions.
compactionIdleTimeout time.Duration

// Number of series in memory, across all tenants.
seriesCount atomic.Int64

// Head compactions metrics.
compactionsTriggered prometheus.Counter
compactionsFailed prometheus.Counter
Expand Down Expand Up @@ -449,14 +467,15 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
metrics: newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled),
limits: limits,
chunkStore: nil,
usersMetadata: map[string]*userMetricsMetadata{},
wal: &noopWAL{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
ingestionRate: newEWMARate(0.2, cfg.RateUpdatePeriod),
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}
i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests)

// Replace specific metrics which we can't directly track but we need to read
// them from the underlying system (ie. TSDB).
Expand Down Expand Up @@ -511,11 +530,11 @@ func NewV2ForFlusher(cfg Config, limits *validation.Overrides, registerer promet
i := &Ingester{
cfg: cfg,
limits: limits,
metrics: newIngesterMetrics(registerer, false, false),
wal: &noopWAL{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
}
i.metrics = newIngesterMetrics(registerer, false, false, i.getInstanceLimits, nil, &i.inflightPushRequests)

i.TSDBState.shipperIngesterID = "flusher"

Expand Down Expand Up @@ -613,6 +632,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
rateUpdateTicker := time.NewTicker(i.cfg.RateUpdatePeriod)
defer rateUpdateTicker.Stop()

ingestionRateTicker := time.NewTicker(1 * time.Second)
defer ingestionRateTicker.Stop()

var activeSeriesTickerChan <-chan time.Time
if i.cfg.ActiveSeriesMetricsEnabled {
t := time.NewTicker(i.cfg.ActiveSeriesMetricsUpdatePeriod)
Expand All @@ -628,6 +650,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
select {
case <-metadataPurgeTicker.C:
i.purgeUserMetricsMetadata()
case <-ingestionRateTicker.C:
i.ingestionRate.tick()
case <-rateUpdateTicker.C:
i.userStatesMtx.RLock()
for _, db := range i.TSDBState.dbs {
Expand Down Expand Up @@ -680,6 +704,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
return nil, err
}

il := i.getInstanceLimits()
if il != nil && il.MaxIngestionRate > 0 {
if rate := i.ingestionRate.rate(); rate >= il.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
}
}

db, err := i.getOrCreateTSDB(userID, false)
if err != nil {
return nil, wrapWithUser(err, userID)
Expand Down Expand Up @@ -841,6 +872,8 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
validation.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
}

i.ingestionRate.add(int64(succeededSamplesCount))

switch req.Source {
case cortexpb.RULE:
db.ingestedRuleSamples.add(int64(succeededSamplesCount))
Expand Down Expand Up @@ -1381,6 +1414,13 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState)
}

gl := i.getInstanceLimits()
if gl != nil && gl.MaxInMemoryTenants > 0 {
if users := int64(len(i.TSDBState.dbs)); users >= gl.MaxInMemoryTenants {
return nil, errMaxUsersLimitReached
}
}

// Create the database and a shipper for a user
db, err := i.createTSDB(userID)
if err != nil {
Expand Down Expand Up @@ -1408,6 +1448,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
seriesInMetric: newMetricCounter(i.limiter),
ingestedAPISamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),

instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.TSDBState.seriesCount,
}

// Create a new user database
Expand Down Expand Up @@ -1876,6 +1919,11 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
tenantDeleted = true
}

// At this point there are no more pushes to TSDB, and no possible compaction. Normally TSDB is empty,
// but if we're closing TSDB because of tenant deletion mark, then it may still contain some series.
// We need to remove these series from series count.
i.TSDBState.seriesCount.Sub(int64(userDB.Head().NumSeries()))

dir := userDB.db.Dir()

if err := userDB.Close(); err != nil {
Expand Down Expand Up @@ -2031,3 +2079,21 @@ func wrappedTSDBIngestErr(ingestErr error, timestamp model.Time, labels []cortex

return fmt.Errorf(errTSDBIngest, ingestErr, timestamp.Time().UTC().Format(time.RFC3339Nano), cortexpb.FromLabelAdaptersToLabels(labels).String())
}

func (i *Ingester) getInstanceLimits() *InstanceLimits {
// Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL.
if i.State() == services.Starting {
return nil
}

if i.cfg.InstanceLimitsFn == nil {
return defaultInstanceLimits
}

l := i.cfg.InstanceLimitsFn()
if l == nil {
return defaultInstanceLimits
}

return l
}
Loading