From 7f85a26f41bbb6848bb433b8a256ec9a73b03a98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Fri, 9 Apr 2021 15:57:45 +0200 Subject: [PATCH] Ingester bounds (#3992) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added global ingester limits. Signed-off-by: Peter Štibraný * Add tests for global limits. Signed-off-by: Peter Štibraný * Expose current limits used by ingester via metrics. Signed-off-by: Peter Štibraný * Add max inflight requests limit. Signed-off-by: Peter Štibraný * Added test for inflight push requests. Signed-off-by: Peter Štibraný * Docs. Signed-off-by: Peter Štibraný * Debug log. Signed-off-by: Peter Štibraný * Test for unmarshalling. Signed-off-by: Peter Štibraný * Nil default global limits. Signed-off-by: Peter Štibraný * CHANGELOG.md Signed-off-by: Peter Štibraný * Expose current ingestion rate as gauge. Signed-off-by: Peter Štibraný * Expose number of inflight requests. Signed-off-by: Peter Štibraný * Change ewmaRate to use RWMutex. Signed-off-by: Peter Štibraný * Rename globalLimits to instanceLimits. Rename max_users to max_tenants. Removed extra parameter to `getOrCreateTSDB` Signed-off-by: Peter Štibraný * Rename globalLimits to instanceLimits, fix users -> tenants, explain that these limits only work when using blocks engine. Signed-off-by: Peter Štibraný * Rename globalLimits to instanceLimits, fix users -> tenants, explain that these limits only work when using blocks engine. Signed-off-by: Peter Štibraný * Remove details from error messages. Signed-off-by: Peter Štibraný * Comment. Signed-off-by: Peter Štibraný * Fix series count when closing non-empty TSDB. Signed-off-by: Peter Štibraný * Added new failure modes to benchmark. Signed-off-by: Peter Štibraný * Fixed docs. Signed-off-by: Peter Štibraný * Tick every second. Signed-off-by: Peter Štibraný * Fix CHANGELOG.md Signed-off-by: Peter Štibraný * Review feedback. Signed-off-by: Peter Štibraný * Review feedback. Signed-off-by: Peter Štibraný * Remove forgotten fmt.Println. Signed-off-by: Peter Štibraný * Use error variables. Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 25 + pkg/cortex/modules.go | 1 + pkg/cortex/runtime_config.go | 16 + pkg/ingester/ingester.go | 30 +- pkg/ingester/ingester_v2.go | 70 ++- pkg/ingester/ingester_v2_test.go | 568 +++++++++++++++++--- pkg/ingester/instance_limits.go | 32 ++ pkg/ingester/instance_limits_test.go | 29 + pkg/ingester/metrics.go | 81 ++- pkg/ingester/rate.go | 19 +- 11 files changed, 786 insertions(+), 86 deletions(-) create mode 100644 pkg/ingester/instance_limits.go create mode 100644 pkg/ingester/instance_limits_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a6a7f2e6b2..23585863fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ * [ENHANCEMENT] Allow use of `y|w|d` suffixes for duration related limits and per-tenant limits. #4044 * [ENHANCEMENT] Query-frontend: Small optimization on top of PR #3968 to avoid unnecessary Extents merging. #4026 * [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040 +* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992. * [BUGFIX] Ruler-API: fix bug where `/api/v1/rules//` endpoint return `400` instead of `404`. #4013 * [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948 * [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 6808c9fb62..30af7c8a80 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -771,6 +771,31 @@ lifecycler: # After what time a series is considered to be inactive. # CLI flag: -ingester.active-series-metrics-idle-timeout [active_series_metrics_idle_timeout: | default = 10m] + +instance_limits: + # Max ingestion rate (samples/sec) that ingester will accept. This limit is + # per-ingester, not per-tenant. Additional push requests will be rejected. + # Current ingestion rate is computed as exponentially weighted moving average, + # updated every second. This limit only works when using blocks engine. 0 = + # unlimited. + # CLI flag: -ingester.instance-limits.max-ingestion-rate + [max_ingestion_rate: | default = 0] + + # Max users that this ingester can hold. Requests from additional users will + # be rejected. This limit only works when using blocks engine. 0 = unlimited. + # CLI flag: -ingester.instance-limits.max-tenants + [max_tenants: | default = 0] + + # Max series that this ingester can hold (across all tenants). Requests to + # create additional series will be rejected. This limit only works when using + # blocks engine. 0 = unlimited. + # CLI flag: -ingester.instance-limits.max-series + [max_series: | default = 0] + + # Max inflight push requests that this ingester can handle (across all + # tenants). Additional requests will be rejected. 0 = unlimited. + # CLI flag: -ingester.instance-limits.max-inflight-push-requests + [max_inflight_push_requests: | default = 0] ``` ### `querier_config` diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 2b4b1e6454..43fd0d87ab 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -424,6 +424,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig) + t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig) t.tsdbIngesterConfig() t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Overrides, t.Store, prometheus.DefaultRegisterer, util_log.Logger) diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index e115198ba0..33f6f41f8b 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -27,6 +27,8 @@ type runtimeConfigValues struct { Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"` + + IngesterLimits ingester.InstanceLimits `yaml:"ingester_limits"` } // runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager @@ -124,6 +126,20 @@ func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.Quer } } +func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.InstanceLimits { + if manager == nil { + return nil + } + + return func() *ingester.InstanceLimits { + val := manager.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + return &cfg.IngesterLimits + } + return nil + } +} + func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 25d3b18dd8..1ff82cfa02 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" tsdb_record "github.com/prometheus/prometheus/tsdb/record" "github.com/weaveworks/common/httpgrpc" + "go.uber.org/atomic" "golang.org/x/time/rate" "google.golang.org/grpc/codes" @@ -93,6 +94,9 @@ type Config struct { DistributorShardingStrategy string `yaml:"-"` DistributorShardByAllLabels bool `yaml:"-"` + DefaultLimits InstanceLimits `yaml:"instance_limits"` + InstanceLimitsFn func() *InstanceLimits `yaml:"-"` + // For testing, you can override the address and ID of this ingester. ingesterClientFactory func(addr string, cfg client.Config) (client.HealthAndIngesterClient, error) } @@ -121,6 +125,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.") f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.") f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.") + + f.Float64Var(&cfg.DefaultLimits.MaxIngestionRate, "ingester.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that ingester will accept. This limit is per-ingester, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. This limit only works when using blocks engine. 0 = unlimited.") + f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.") + f.Int64Var(&cfg.DefaultLimits.MaxInMemorySeries, "ingester.instance-limits.max-series", 0, "Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. This limit only works when using blocks engine. 0 = unlimited.") + f.Int64Var(&cfg.DefaultLimits.MaxInflightPushRequests, "ingester.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.") } // Ingester deals with "in flight" chunks. Based on Prometheus 1.x @@ -167,6 +176,10 @@ type Ingester struct { // Prometheus block storage TSDBState TSDBState + + // Rate of pushed samples. Only used by V2-ingester to limit global samples push rate. + ingestionRate *ewmaRate + inflightPushRequests atomic.Int64 } // ChunkStore is the interface we need to store chunks @@ -176,6 +189,8 @@ type ChunkStore interface { // New constructs a new Ingester. func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { + defaultInstanceLimits = &cfg.DefaultLimits + if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.MakeIngesterClient } @@ -209,7 +224,6 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c i := &Ingester{ cfg: cfg, clientConfig: clientConfig, - metrics: newIngesterMetrics(registerer, true, cfg.ActiveSeriesMetricsEnabled), limits: limits, chunkStore: chunkStore, @@ -219,6 +233,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c registerer: registerer, logger: logger, } + i.metrics = newIngesterMetrics(registerer, true, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, nil, &i.inflightPushRequests) var err error // During WAL recovery, it will create new user states which requires the limiter. @@ -301,7 +316,6 @@ func NewForFlusher(cfg Config, chunkStore ChunkStore, limits *validation.Overrid i := &Ingester{ cfg: cfg, - metrics: newIngesterMetrics(registerer, true, false), chunkStore: chunkStore, flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), flushRateLimiter: rate.NewLimiter(rate.Inf, 1), @@ -309,6 +323,7 @@ func NewForFlusher(cfg Config, chunkStore ChunkStore, limits *validation.Overrid limits: limits, logger: logger, } + i.metrics = newIngesterMetrics(registerer, true, false, i.getInstanceLimits, nil, &i.inflightPushRequests) i.BasicService = services.NewBasicService(i.startingForFlusher, i.loopForFlusher, i.stopping) return i, nil @@ -444,6 +459,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte return nil, err } + // We will report *this* request in the error too. + inflight := i.inflightPushRequests.Inc() + defer i.inflightPushRequests.Dec() + + gl := i.getInstanceLimits() + if gl != nil && gl.MaxInflightPushRequests > 0 { + if inflight > gl.MaxInflightPushRequests { + return nil, errTooManyInflightPushRequests + } + } + if i.cfg.BlocksStorageEnabled { return i.v2Push(ctx, req) } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 36da126106..bc0838d4fd 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -105,6 +105,9 @@ type userTSDB struct { seriesInMetric *metricCounter limiter *Limiter + instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester. + instanceLimitsFn func() *InstanceLimits + stateMtx sync.RWMutex state tsdbState pushesInFlight sync.WaitGroup // Increased with stateMtx read lock held, only if state == active or activeShipping. @@ -214,6 +217,14 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error { return nil } + // Verify ingester's global limit + gl := u.instanceLimitsFn() + if gl != nil && gl.MaxInMemorySeries > 0 { + if series := u.instanceSeriesCount.Load(); series >= gl.MaxInMemorySeries { + return errMaxSeriesLimitReached + } + } + // Total series limit. if err := u.limiter.AssertMaxSeriesPerUser(u.userID, int(u.Head().NumSeries())); err != nil { return err @@ -233,6 +244,8 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error { // PostCreation implements SeriesLifecycleCallback interface. func (u *userTSDB) PostCreation(metric labels.Labels) { + u.instanceSeriesCount.Inc() + metricName, err := extract.MetricNameFromLabels(metric) if err != nil { // This should never happen because it has already been checked in PreCreation(). @@ -243,6 +256,8 @@ func (u *userTSDB) PostCreation(metric labels.Labels) { // PostDeletion implements SeriesLifecycleCallback interface. func (u *userTSDB) PostDeletion(metrics ...labels.Labels) { + u.instanceSeriesCount.Sub(int64(len(metrics))) + for _, metric := range metrics { metricName, err := extract.MetricNameFromLabels(metric) if err != nil { @@ -377,6 +392,9 @@ type TSDBState struct { // Timeout chosen for idle compactions. compactionIdleTimeout time.Duration + // Number of series in memory, across all tenants. + seriesCount atomic.Int64 + // Head compactions metrics. compactionsTriggered prometheus.Counter compactionsFailed prometheus.Counter @@ -449,14 +467,15 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i := &Ingester{ cfg: cfg, clientConfig: clientConfig, - metrics: newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled), limits: limits, chunkStore: nil, usersMetadata: map[string]*userMetricsMetadata{}, wal: &noopWAL{}, TSDBState: newTSDBState(bucketClient, registerer), logger: logger, + ingestionRate: newEWMARate(0.2, cfg.RateUpdatePeriod), } + i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests) // Replace specific metrics which we can't directly track but we need to read // them from the underlying system (ie. TSDB). @@ -511,11 +530,11 @@ func NewV2ForFlusher(cfg Config, limits *validation.Overrides, registerer promet i := &Ingester{ cfg: cfg, limits: limits, - metrics: newIngesterMetrics(registerer, false, false), wal: &noopWAL{}, TSDBState: newTSDBState(bucketClient, registerer), logger: logger, } + i.metrics = newIngesterMetrics(registerer, false, false, i.getInstanceLimits, nil, &i.inflightPushRequests) i.TSDBState.shipperIngesterID = "flusher" @@ -613,6 +632,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error { rateUpdateTicker := time.NewTicker(i.cfg.RateUpdatePeriod) defer rateUpdateTicker.Stop() + ingestionRateTicker := time.NewTicker(1 * time.Second) + defer ingestionRateTicker.Stop() + var activeSeriesTickerChan <-chan time.Time if i.cfg.ActiveSeriesMetricsEnabled { t := time.NewTicker(i.cfg.ActiveSeriesMetricsUpdatePeriod) @@ -628,6 +650,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { select { case <-metadataPurgeTicker.C: i.purgeUserMetricsMetadata() + case <-ingestionRateTicker.C: + i.ingestionRate.tick() case <-rateUpdateTicker.C: i.userStatesMtx.RLock() for _, db := range i.TSDBState.dbs { @@ -680,6 +704,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor return nil, err } + il := i.getInstanceLimits() + if il != nil && il.MaxIngestionRate > 0 { + if rate := i.ingestionRate.rate(); rate >= il.MaxIngestionRate { + return nil, errMaxSamplesPushRateLimitReached + } + } + db, err := i.getOrCreateTSDB(userID, false) if err != nil { return nil, wrapWithUser(err, userID) @@ -841,6 +872,8 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor validation.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount)) } + i.ingestionRate.add(int64(succeededSamplesCount)) + switch req.Source { case cortexpb.RULE: db.ingestedRuleSamples.add(int64(succeededSamplesCount)) @@ -1381,6 +1414,13 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState) } + gl := i.getInstanceLimits() + if gl != nil && gl.MaxInMemoryTenants > 0 { + if users := int64(len(i.TSDBState.dbs)); users >= gl.MaxInMemoryTenants { + return nil, errMaxUsersLimitReached + } + } + // Create the database and a shipper for a user db, err := i.createTSDB(userID) if err != nil { @@ -1408,6 +1448,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { seriesInMetric: newMetricCounter(i.limiter), ingestedAPISamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod), ingestedRuleSamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod), + + instanceLimitsFn: i.getInstanceLimits, + instanceSeriesCount: &i.TSDBState.seriesCount, } // Create a new user database @@ -1876,6 +1919,11 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes tenantDeleted = true } + // At this point there are no more pushes to TSDB, and no possible compaction. Normally TSDB is empty, + // but if we're closing TSDB because of tenant deletion mark, then it may still contain some series. + // We need to remove these series from series count. + i.TSDBState.seriesCount.Sub(int64(userDB.Head().NumSeries())) + dir := userDB.db.Dir() if err := userDB.Close(); err != nil { @@ -2031,3 +2079,21 @@ func wrappedTSDBIngestErr(ingestErr error, timestamp model.Time, labels []cortex return fmt.Errorf(errTSDBIngest, ingestErr, timestamp.Time().UTC().Format(time.RFC3339Nano), cortexpb.FromLabelAdaptersToLabels(labels).String()) } + +func (i *Ingester) getInstanceLimits() *InstanceLimits { + // Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL. + if i.State() == services.Starting { + return nil + } + + if i.cfg.InstanceLimitsFn == nil { + return defaultInstanceLimits + } + + l := i.cfg.InstanceLimitsFn() + if l == nil { + return defaultInstanceLimits + } + + return l +} diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 8de413d473..3724783b68 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -12,6 +12,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -36,6 +37,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/chunk/encoding" @@ -601,7 +603,13 @@ func benchmarkIngesterV2Push(b *testing.B, limits validation.Limits, errorsExpec } } } +} +func verifyErrorString(tb testing.TB, err error, expectedErr string) { + if err == nil || !strings.Contains(err.Error(), expectedErr) { + tb.Helper() + tb.Fatalf("unexpected error. expected: %s actual: %v", expectedErr, err) + } } func Benchmark_Ingester_v2PushOnError(b *testing.B) { @@ -629,13 +637,19 @@ func Benchmark_Ingester_v2PushOnError(b *testing.B) { }, } + instanceLimits := map[string]*InstanceLimits{ + "no limits": nil, + "limits set": {MaxIngestionRate: 1000, MaxInMemoryTenants: 1, MaxInMemorySeries: 1000, MaxInflightPushRequests: 1000}, // these match max values from scenarios + } + tests := map[string]struct { - prepareConfig func(limits *validation.Limits) + // If this returns false, test is skipped. + prepareConfig func(limits *validation.Limits, instanceLimits *InstanceLimits) bool beforeBenchmark func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) runBenchmark func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) }{ "out of bound samples": { - prepareConfig: func(limits *validation.Limits) {}, + prepareConfig: func(limits *validation.Limits, instanceLimits *InstanceLimits) bool { return true }, beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { // Push a single time series to set the TSDB min time. currTimeReq := cortexpb.ToWriteRequest( @@ -653,14 +667,12 @@ func Benchmark_Ingester_v2PushOnError(b *testing.B) { for n := 0; n < b.N; n++ { _, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck - if !strings.Contains(err.Error(), expectedErr) { - b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error()) - } + verifyErrorString(b, err, expectedErr) } }, }, "out of order samples": { - prepareConfig: func(limits *validation.Limits) {}, + prepareConfig: func(limits *validation.Limits, instanceLimits *InstanceLimits) bool { return true }, beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { // For each series, push a single sample with a timestamp greater than next pushes. for i := 0; i < numSeriesPerRequest; i++ { @@ -681,23 +693,19 @@ func Benchmark_Ingester_v2PushOnError(b *testing.B) { for n := 0; n < b.N; n++ { _, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck - if !strings.Contains(err.Error(), expectedErr) { - b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error()) - } + verifyErrorString(b, err, expectedErr) } }, }, "per-user series limit reached": { - prepareConfig: func(limits *validation.Limits) { + prepareConfig: func(limits *validation.Limits, instanceLimits *InstanceLimits) bool { limits.MaxLocalSeriesPerUser = 1 + return true }, beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { // Push a series with a metric name different than the one used during the benchmark. - metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "another"}} - metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters) - currTimeReq := cortexpb.ToWriteRequest( - []labels.Labels{metricLabels}, + []labels.Labels{labels.FromStrings(labels.MetricName, "another")}, []cortexpb.Sample{{Value: 1, TimestampMs: sampleTimestamp + 1}}, nil, cortexpb.API) @@ -705,29 +713,22 @@ func Benchmark_Ingester_v2PushOnError(b *testing.B) { require.NoError(b, err) }, runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { - expectedErr := "per-user series limit" - // Push series with a different name than the one already pushed. for n := 0; n < b.N; n++ { _, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck - - if !strings.Contains(err.Error(), expectedErr) { - b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error()) - } + verifyErrorString(b, err, "per-user series limit") } }, }, "per-metric series limit reached": { - prepareConfig: func(limits *validation.Limits) { + prepareConfig: func(limits *validation.Limits, instanceLimits *InstanceLimits) bool { limits.MaxLocalSeriesPerMetric = 1 + return true }, beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { // Push a series with the same metric name but different labels than the one used during the benchmark. - metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: metricName}, {Name: "cardinality", Value: "another"}} - metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters) - currTimeReq := cortexpb.ToWriteRequest( - []labels.Labels{metricLabels}, + []labels.Labels{labels.FromStrings(labels.MetricName, metricName, "cardinality", "another")}, []cortexpb.Sample{{Value: 1, TimestampMs: sampleTimestamp + 1}}, nil, cortexpb.API) @@ -735,15 +736,92 @@ func Benchmark_Ingester_v2PushOnError(b *testing.B) { require.NoError(b, err) }, runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { - expectedErr := "per-metric series limit" - // Push series with different labels than the one already pushed. for n := 0; n < b.N; n++ { _, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck + verifyErrorString(b, err, "per-metric series limit") + } + }, + }, + "very low ingestion rate limit": { + prepareConfig: func(limits *validation.Limits, instanceLimits *InstanceLimits) bool { + if instanceLimits == nil { + return false + } + instanceLimits.MaxIngestionRate = 0.00001 // very low + return true + }, + beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { + // Send a lot of samples + _, err := ingester.v2Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000)) + require.NoError(b, err) - if !strings.Contains(err.Error(), expectedErr) { - b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error()) - } + ingester.ingestionRate.tick() + }, + runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { + // Push series with different labels than the one already pushed. + for n := 0; n < b.N; n++ { + _, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) + verifyErrorString(b, err, "push rate reached") + } + }, + }, + "max number of tenants reached": { + prepareConfig: func(limits *validation.Limits, instanceLimits *InstanceLimits) bool { + if instanceLimits == nil { + return false + } + instanceLimits.MaxInMemoryTenants = 1 + return true + }, + beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { + // Send some samples for one tenant (not the same that is used during the test) + ctx := user.InjectOrgID(context.Background(), "different_tenant") + _, err := ingester.v2Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000)) + require.NoError(b, err) + }, + runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { + // Push series with different labels than the one already pushed. + for n := 0; n < b.N; n++ { + _, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) + verifyErrorString(b, err, "max tenants limit reached") + } + }, + }, + "max number of series reached": { + prepareConfig: func(limits *validation.Limits, instanceLimits *InstanceLimits) bool { + if instanceLimits == nil { + return false + } + instanceLimits.MaxInMemorySeries = 1 + return true + }, + beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { + _, err := ingester.v2Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000)) + require.NoError(b, err) + }, + runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { + for n := 0; n < b.N; n++ { + _, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) + verifyErrorString(b, err, "max series limit reached") + } + }, + }, + "max inflight requests reached": { + prepareConfig: func(limits *validation.Limits, instanceLimits *InstanceLimits) bool { + if instanceLimits == nil { + return false + } + instanceLimits.MaxInflightPushRequests = 1 + return true + }, + beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { + ingester.inflightPushRequests.Inc() + }, + runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { + for n := 0; n < b.N; n++ { + _, err := ingester.Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) + verifyErrorString(b, err, "too many inflight push requests") } }, }, @@ -751,57 +829,73 @@ func Benchmark_Ingester_v2PushOnError(b *testing.B) { for testName, testData := range tests { for scenarioName, scenario := range scenarios { - b.Run(fmt.Sprintf("failure: %s, scenario: %s", testName, scenarioName), func(b *testing.B) { - registry := prometheus.NewRegistry() - - // Create a mocked ingester - cfg := defaultIngesterTestConfig() - cfg.LifecyclerConfig.JoinAfter = 0 - - limits := defaultLimitsTestConfig() - testData.prepareConfig(&limits) + for limitsName, limits := range instanceLimits { + b.Run(fmt.Sprintf("failure: %s, scenario: %s, limits: %s", testName, scenarioName, limitsName), func(b *testing.B) { + registry := prometheus.NewRegistry() + + instanceLimits := limits + if instanceLimits != nil { + // make a copy, to avoid changing value in the instanceLimits map. + newLimits := &InstanceLimits{} + *newLimits = *instanceLimits + instanceLimits = newLimits + } - ingester, err := prepareIngesterWithBlocksStorageAndLimits(b, cfg, limits, "", registry) - require.NoError(b, err) - require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingester)) - defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck + // Create a mocked ingester + cfg := defaultIngesterTestConfig() + cfg.LifecyclerConfig.JoinAfter = 0 - // Wait until the ingester is ACTIVE - test.Poll(b, 100*time.Millisecond, ring.ACTIVE, func() interface{} { - return ingester.lifecycler.GetState() - }) + limits := defaultLimitsTestConfig() + if !testData.prepareConfig(&limits, instanceLimits) { + b.SkipNow() + } - testData.beforeBenchmark(b, ingester, scenario.numSeriesPerRequest) + cfg.InstanceLimitsFn = func() *InstanceLimits { + return instanceLimits + } - // Prepare the request. - metrics := make([]labels.Labels, 0, scenario.numSeriesPerRequest) - samples := make([]cortexpb.Sample, 0, scenario.numSeriesPerRequest) - for i := 0; i < scenario.numSeriesPerRequest; i++ { - metrics = append(metrics, labels.Labels{{Name: labels.MetricName, Value: metricName}, {Name: "cardinality", Value: strconv.Itoa(i)}}) - samples = append(samples, cortexpb.Sample{Value: float64(i), TimestampMs: sampleTimestamp}) - } + ingester, err := prepareIngesterWithBlocksStorageAndLimits(b, cfg, limits, "", registry) + require.NoError(b, err) + require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingester)) + defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(b, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return ingester.lifecycler.GetState() + }) + + testData.beforeBenchmark(b, ingester, scenario.numSeriesPerRequest) + + // Prepare the request. + metrics := make([]labels.Labels, 0, scenario.numSeriesPerRequest) + samples := make([]cortexpb.Sample, 0, scenario.numSeriesPerRequest) + for i := 0; i < scenario.numSeriesPerRequest; i++ { + metrics = append(metrics, labels.Labels{{Name: labels.MetricName, Value: metricName}, {Name: "cardinality", Value: strconv.Itoa(i)}}) + samples = append(samples, cortexpb.Sample{Value: float64(i), TimestampMs: sampleTimestamp}) + } - // Run the benchmark. - wg := sync.WaitGroup{} - wg.Add(scenario.numConcurrentClients) - start := make(chan struct{}) + // Run the benchmark. + wg := sync.WaitGroup{} + wg.Add(scenario.numConcurrentClients) + start := make(chan struct{}) - b.ReportAllocs() - b.ResetTimer() + b.ReportAllocs() + b.ResetTimer() - for c := 0; c < scenario.numConcurrentClients; c++ { - go func() { - defer wg.Done() - <-start + for c := 0; c < scenario.numConcurrentClients; c++ { + go func() { + defer wg.Done() + <-start - testData.runBenchmark(b, ingester, metrics, samples) - }() - } + testData.runBenchmark(b, ingester, metrics, samples) + }() + } - b.ResetTimer() - close(start) - wg.Wait() - }) + b.ResetTimer() + close(start) + wg.Wait() + }) + } } } } @@ -2122,7 +2216,9 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T) }) pushSingleSampleWithMetadata(t, i) + require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) i.compactBlocks(context.Background(), true) + require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) i.shipBlocks(context.Background()) numObjects := len(bucket.Objects()) @@ -2137,7 +2233,9 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T) // After writing tenant deletion mark, pushSingleSampleWithMetadata(t, i) + require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) i.compactBlocks(context.Background(), true) + require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) i.shipBlocks(context.Background()) numObjectsAfterMarkingTenantForDeletion := len(bucket.Objects()) @@ -2145,6 +2243,49 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T) require.Equal(t, tsdbTenantMarkedForDeletion, i.closeAndDeleteUserTSDBIfIdle(userID)) } +func TestIngester_seriesCountIsCorrectAfterClosingTSDBForDeletedTenant(t *testing.T) { + cfg := defaultIngesterTestConfig() + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 2 + + // Create ingester + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) + require.NoError(t, err) + + // Use in-memory bucket. + bucket := objstore.NewInMemBucket() + + // Write tenant deletion mark. + require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), bucket, userID, nil, cortex_tsdb.NewTenantDeletionMark(time.Now()))) + + i.TSDBState.bucket = bucket + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + pushSingleSampleWithMetadata(t, i) + require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) + + // We call shipBlocks to check for deletion marker (it happens inside this method). + i.shipBlocks(context.Background()) + + // Verify that tenant deletion mark was found. + db := i.getTSDB(userID) + require.NotNil(t, db) + require.True(t, db.deletionMarkFound.Load()) + + // If we try to close TSDB now, it should succeed, even though TSDB is not idle and empty. + require.Equal(t, uint64(1), db.Head().NumSeries()) + require.Equal(t, tsdbTenantMarkedForDeletion, i.closeAndDeleteUserTSDBIfIdle(userID)) + + // Closing should decrease series count. + require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) +} + func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInProgress(t *testing.T) { ctx := context.Background() cfg := defaultIngesterTestConfig() @@ -2809,6 +2950,8 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) { pushSingleSampleWithMetadata(t, i) i.v2UpdateActiveSeries() + require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) + metricsToCheck := []string{memSeriesCreatedTotalName, memSeriesRemovedTotalName, "cortex_ingester_memory_users", "cortex_ingester_active_series", "cortex_ingester_memory_metadata", "cortex_ingester_memory_metadata_created_total", "cortex_ingester_memory_metadata_removed_total"} @@ -2847,6 +2990,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) { require.Greater(t, testutil.ToFloat64(i.TSDBState.idleTsdbChecks.WithLabelValues(string(tsdbIdleClosed))), float64(0)) i.v2UpdateActiveSeries() + require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) // Flushing removed all series from memory. // Verify that user has disappeared from metrics. require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(` @@ -3227,3 +3371,283 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { cortex_ingester_tsdb_compactions_total 1 `), "cortex_ingester_tsdb_compactions_total")) } + +func TestIngester_v2PushInstanceLimits(t *testing.T) { + tests := map[string]struct { + limits InstanceLimits + reqs map[string][]*cortexpb.WriteRequest + expectedErr error + expectedErrType interface{} + }{ + "should succeed creating one user and series": { + limits: InstanceLimits{MaxInMemorySeries: 1, MaxInMemoryTenants: 1}, + reqs: map[string][]*cortexpb.WriteRequest{ + "test": { + cortexpb.ToWriteRequest( + []labels.Labels{cortexpb.FromLabelAdaptersToLabels([]cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}})}, + []cortexpb.Sample{{Value: 1, TimestampMs: 9}}, + []*cortexpb.MetricMetadata{ + {MetricFamilyName: "metric_name_1", Help: "a help for metric_name_1", Unit: "", Type: cortexpb.COUNTER}, + }, + cortexpb.API), + }, + }, + expectedErr: nil, + }, + + "should fail creating two series": { + limits: InstanceLimits{MaxInMemorySeries: 1, MaxInMemoryTenants: 1}, + + reqs: map[string][]*cortexpb.WriteRequest{ + "test": { + cortexpb.ToWriteRequest( + []labels.Labels{cortexpb.FromLabelAdaptersToLabels([]cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test1"}})}, + []cortexpb.Sample{{Value: 1, TimestampMs: 9}}, + nil, + cortexpb.API), + + cortexpb.ToWriteRequest( + []labels.Labels{cortexpb.FromLabelAdaptersToLabels([]cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test2"}})}, // another series + []cortexpb.Sample{{Value: 1, TimestampMs: 10}}, + nil, + cortexpb.API), + }, + }, + + expectedErr: wrapWithUser(errMaxSeriesLimitReached, "test"), + }, + + "should fail creating two users": { + limits: InstanceLimits{MaxInMemorySeries: 1, MaxInMemoryTenants: 1}, + + reqs: map[string][]*cortexpb.WriteRequest{ + "user1": { + cortexpb.ToWriteRequest( + []labels.Labels{cortexpb.FromLabelAdaptersToLabels([]cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test1"}})}, + []cortexpb.Sample{{Value: 1, TimestampMs: 9}}, + nil, + cortexpb.API), + }, + + "user2": { + cortexpb.ToWriteRequest( + []labels.Labels{cortexpb.FromLabelAdaptersToLabels([]cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test2"}})}, // another series + []cortexpb.Sample{{Value: 1, TimestampMs: 10}}, + nil, + cortexpb.API), + }, + }, + expectedErr: wrapWithUser(errMaxUsersLimitReached, "user2"), + }, + + "should fail pushing samples in two requests due to rate limit": { + limits: InstanceLimits{MaxInMemorySeries: 1, MaxInMemoryTenants: 1, MaxIngestionRate: 0.001}, + + reqs: map[string][]*cortexpb.WriteRequest{ + "user1": { + cortexpb.ToWriteRequest( + []labels.Labels{cortexpb.FromLabelAdaptersToLabels([]cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test1"}})}, + []cortexpb.Sample{{Value: 1, TimestampMs: 9}}, + nil, + cortexpb.API), + + cortexpb.ToWriteRequest( + []labels.Labels{cortexpb.FromLabelAdaptersToLabels([]cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test1"}})}, + []cortexpb.Sample{{Value: 1, TimestampMs: 10}}, + nil, + cortexpb.API), + }, + }, + expectedErr: errMaxSamplesPushRateLimitReached, + }, + } + + defaultInstanceLimits = nil + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Create a mocked ingester + cfg := defaultIngesterTestConfig() + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.InstanceLimitsFn = func() *InstanceLimits { + return &testData.limits + } + + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Iterate through users in sorted order (by username). + uids := []string{} + totalPushes := 0 + for uid, requests := range testData.reqs { + uids = append(uids, uid) + totalPushes += len(requests) + } + sort.Strings(uids) + + pushIdx := 0 + for _, uid := range uids { + ctx := user.InjectOrgID(context.Background(), uid) + + for _, req := range testData.reqs[uid] { + pushIdx++ + _, err := i.Push(ctx, req) + + if pushIdx < totalPushes { + require.NoError(t, err) + } else { + // Last push may expect error. + if testData.expectedErr != nil { + assert.Equal(t, testData.expectedErr, err) + } else if testData.expectedErrType != nil { + assert.True(t, errors.As(err, testData.expectedErrType), "expected error type %T, got %v", testData.expectedErrType, err) + } else { + assert.NoError(t, err) + } + } + + // imitate time ticking between each push + i.ingestionRate.tick() + + rate := testutil.ToFloat64(i.metrics.ingestionRate) + require.NotZero(t, rate) + } + } + }) + } +} + +func TestIngester_instanceLimitsMetrics(t *testing.T) { + reg := prometheus.NewRegistry() + + l := InstanceLimits{ + MaxIngestionRate: 10, + MaxInMemoryTenants: 20, + MaxInMemorySeries: 30, + } + + cfg := defaultIngesterTestConfig() + cfg.InstanceLimitsFn = func() *InstanceLimits { + return &l + } + cfg.LifecyclerConfig.JoinAfter = 0 + + _, err := prepareIngesterWithBlocksStorage(t, cfg, reg) + require.NoError(t, err) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingester_instance_limits Instance limits used by this ingester. + # TYPE cortex_ingester_instance_limits gauge + cortex_ingester_instance_limits{limit="max_inflight_push_requests"} 0 + cortex_ingester_instance_limits{limit="max_ingestion_rate"} 10 + cortex_ingester_instance_limits{limit="max_series"} 30 + cortex_ingester_instance_limits{limit="max_tenants"} 20 + `), "cortex_ingester_instance_limits")) + + l.MaxInMemoryTenants = 1000 + l.MaxInMemorySeries = 2000 + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingester_instance_limits Instance limits used by this ingester. + # TYPE cortex_ingester_instance_limits gauge + cortex_ingester_instance_limits{limit="max_inflight_push_requests"} 0 + cortex_ingester_instance_limits{limit="max_ingestion_rate"} 10 + cortex_ingester_instance_limits{limit="max_series"} 2000 + cortex_ingester_instance_limits{limit="max_tenants"} 1000 + `), "cortex_ingester_instance_limits")) +} + +func TestIngester_inflightPushRequests(t *testing.T) { + limits := InstanceLimits{MaxInflightPushRequests: 1} + + // Create a mocked ingester + cfg := defaultIngesterTestConfig() + cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits } + cfg.LifecyclerConfig.JoinAfter = 0 + + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test") + + startCh := make(chan struct{}) + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + count := 100000 + target := time.Second + + // find right count to make sure that push takes given target duration. + for { + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, fmt.Sprintf("test-%d", count)), count) + + start := time.Now() + _, err := i.Push(ctx, req) + require.NoError(t, err) + + elapsed := time.Since(start) + t.Log(count, elapsed) + if elapsed > time.Second { + break + } + + count = int(float64(count) * float64(target/elapsed) * 1.5) // Adjust number of samples to hit our target push duration. + } + + // Now repeat push with number of samples calibrated to our target. + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, fmt.Sprintf("real-%d", count)), count) + + // Signal that we're going to do the real push now. + close(startCh) + + _, err := i.Push(ctx, req) + return err + }) + + g.Go(func() error { + select { + case <-ctx.Done(): + // failed to setup + case <-startCh: + // we can start the test. + } + + time.Sleep(10 * time.Millisecond) // Give first goroutine a chance to start pushing... + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1024) + + _, err := i.Push(ctx, req) + require.Equal(t, errTooManyInflightPushRequests, err) + return nil + }) + + require.NoError(t, g.Wait()) +} + +func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest { + var lbls = make([]labels.Labels, 0, count) + var samples = make([]cortexpb.Sample, 0, count) + + for i := 0; i < count; i++ { + samples = append(samples, cortexpb.Sample{ + Value: float64(i), + TimestampMs: int64(i), + }) + lbls = append(lbls, l) + } + + return cortexpb.ToWriteRequest(lbls, samples, nil, cortexpb.API) +} diff --git a/pkg/ingester/instance_limits.go b/pkg/ingester/instance_limits.go new file mode 100644 index 0000000000..db834bb8f0 --- /dev/null +++ b/pkg/ingester/instance_limits.go @@ -0,0 +1,32 @@ +package ingester + +import "github.com/pkg/errors" + +var ( + // We don't include values in the message to avoid leaking Cortex cluster configuration to users. + errMaxSamplesPushRateLimitReached = errors.New("cannot push more samples: ingester's max samples push rate reached") + errMaxUsersLimitReached = errors.New("cannot create TSDB: ingesters's max tenants limit reached") + errMaxSeriesLimitReached = errors.New("cannot add series: ingesters's max series limit reached") + errTooManyInflightPushRequests = errors.New("cannot push: too many inflight push requests") +) + +// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return +// (internal) error. +type InstanceLimits struct { + MaxIngestionRate float64 `yaml:"max_ingestion_rate"` + MaxInMemoryTenants int64 `yaml:"max_tenants"` + MaxInMemorySeries int64 `yaml:"max_series"` + MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"` +} + +// Sets default limit values for unmarshalling. +var defaultInstanceLimits *InstanceLimits = nil + +// UnmarshalYAML implements the yaml.Unmarshaler interface. If give +func (l *InstanceLimits) UnmarshalYAML(unmarshal func(interface{}) error) error { + if defaultInstanceLimits != nil { + *l = *defaultInstanceLimits + } + type plain InstanceLimits // type indirection to make sure we don't go into recursive loop + return unmarshal((*plain)(l)) +} diff --git a/pkg/ingester/instance_limits_test.go b/pkg/ingester/instance_limits_test.go new file mode 100644 index 0000000000..ece468511b --- /dev/null +++ b/pkg/ingester/instance_limits_test.go @@ -0,0 +1,29 @@ +package ingester + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestInstanceLimitsUnmarshal(t *testing.T) { + defaultInstanceLimits = &InstanceLimits{ + MaxIngestionRate: 10, + MaxInMemoryTenants: 20, + MaxInMemorySeries: 30, + MaxInflightPushRequests: 40, + } + + l := InstanceLimits{} + input := ` +max_ingestion_rate: 125.678 +max_tenants: 50000 +` + + require.NoError(t, yaml.UnmarshalStrict([]byte(input), &l)) + require.Equal(t, float64(125.678), l.MaxIngestionRate) + require.Equal(t, int64(50000), l.MaxInMemoryTenants) + require.Equal(t, int64(30), l.MaxInMemorySeries) // default value + require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value +} diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index c2e4cc65e6..cb06770fe1 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -3,6 +3,7 @@ package ingester import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/util" ) @@ -53,9 +54,23 @@ type ingesterMetrics struct { oldestUnflushedChunkTimestamp prometheus.Gauge activeSeriesPerUser *prometheus.GaugeVec + + // Global limit metrics + maxUsersGauge prometheus.GaugeFunc + maxSeriesGauge prometheus.GaugeFunc + maxIngestionRate prometheus.GaugeFunc + ingestionRate prometheus.GaugeFunc + maxInflightPushRequests prometheus.GaugeFunc + inflightRequests prometheus.GaugeFunc } -func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool) *ingesterMetrics { +func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *ewmaRate, inflightRequests *atomic.Int64) *ingesterMetrics { + const ( + instanceLimits = "cortex_ingester_instance_limits" + instanceLimitsHelp = "Instance limits used by this ingester." // Must be same for all registrations. + limitLabel = "limit" + ) + m := &ingesterMetrics{ flushQueueLength: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Name: "cortex_ingester_flush_queue_length", @@ -190,6 +205,70 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD Help: "Unix timestamp of the oldest unflushed chunk in the memory", }), + maxUsersGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: instanceLimits, + Help: instanceLimitsHelp, + ConstLabels: map[string]string{limitLabel: "max_tenants"}, + }, func() float64 { + if g := instanceLimitsFn(); g != nil { + return float64(g.MaxInMemoryTenants) + } + return 0 + }), + + maxSeriesGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: instanceLimits, + Help: instanceLimitsHelp, + ConstLabels: map[string]string{limitLabel: "max_series"}, + }, func() float64 { + if g := instanceLimitsFn(); g != nil { + return float64(g.MaxInMemorySeries) + } + return 0 + }), + + maxIngestionRate: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: instanceLimits, + Help: instanceLimitsHelp, + ConstLabels: map[string]string{limitLabel: "max_ingestion_rate"}, + }, func() float64 { + if g := instanceLimitsFn(); g != nil { + return float64(g.MaxIngestionRate) + } + return 0 + }), + + maxInflightPushRequests: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: instanceLimits, + Help: instanceLimitsHelp, + ConstLabels: map[string]string{limitLabel: "max_inflight_push_requests"}, + }, func() float64 { + if g := instanceLimitsFn(); g != nil { + return float64(g.MaxInflightPushRequests) + } + return 0 + }), + + ingestionRate: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingester_ingestion_rate_samples_per_second", + Help: "Current ingestion rate in samples/sec that ingester is using to limit access.", + }, func() float64 { + if ingestionRate != nil { + return ingestionRate.rate() + } + return 0 + }), + + inflightRequests: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingester_inflight_push_requests", + Help: "Current number of inflight push requests", + }, func() float64 { + if inflightRequests != nil { + return float64(inflightRequests.Load()) + } + return 0 + }), + // Not registered automatically, but only if activeSeriesEnabled is true. activeSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_ingester_active_series", diff --git a/pkg/ingester/rate.go b/pkg/ingester/rate.go index ecabd8b783..3ed7f45db7 100644 --- a/pkg/ingester/rate.go +++ b/pkg/ingester/rate.go @@ -10,11 +10,13 @@ import ( // ewmaRate tracks an exponentially weighted moving average of a per-second rate. type ewmaRate struct { newEvents atomic.Int64 - alpha float64 - interval time.Duration - lastRate float64 - init bool - mutex sync.Mutex + + alpha float64 + interval time.Duration + + mutex sync.RWMutex + lastRate float64 + init bool } func newEWMARate(alpha float64, interval time.Duration) *ewmaRate { @@ -26,15 +28,14 @@ func newEWMARate(alpha float64, interval time.Duration) *ewmaRate { // rate returns the per-second rate. func (r *ewmaRate) rate() float64 { - r.mutex.Lock() - defer r.mutex.Unlock() + r.mutex.RLock() + defer r.mutex.RUnlock() return r.lastRate } // tick assumes to be called every r.interval. func (r *ewmaRate) tick() { - newEvents := r.newEvents.Load() - r.newEvents.Sub(newEvents) + newEvents := r.newEvents.Swap(0) instantRate := float64(newEvents) / r.interval.Seconds() r.mutex.Lock()