Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor limits #4071

Merged
merged 8 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Add a metric `cortex_compactor_compaction_interval_seconds` for the compaction interval config value. #4040
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
* [ENHANCEMENT] Cortex is now built with Go 1.16. #4062
* [ENHANCEMENT] Distributor: added per-distributor limits: max number of inflight requests (`-distributor.instance-limits.max-inflight-push-requests`) and max ingestion rate in samples/sec (`-distributor.instance-limits.max-ingestion-rate`). If not set, these two are unlimited. Also added metrics to expose current values (`cortex_distributor_inflight_push_requests`, `cortex_distributor_ingestion_rate_samples_per_second`) as well as limits (`cortex_distributor_instance_limits` with various `limit` label values). #4071
* [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074
* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `tenant` to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `tenant` parameter. If no `tenant` is specified, all tenants are flushed, as before. #4073
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,20 @@ ring:
# Name of network interface to read address from.
# CLI flag: -distributor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]

instance_limits:
# Max ingestion rate (samples/sec) that this distributor will accept. This
# limit is per-distributor, not per-tenant. Additional push requests will be
# rejected. Current ingestion rate is computed as exponentially weighted
# moving average, updated every second. 0 = unlimited.
# CLI flag: -distributor.instance-limits.max-ingestion-rate
[max_ingestion_rate: <float> | default = 0]

# Max inflight push requests that this distributor can handle. This limit is
# per-distributor, not per-tenant. Additional requests will be rejected. 0 =
# unlimited.
# CLI flag: -distributor.instance-limits.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]
```

### `ingester_config`
Expand Down
80 changes: 78 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/cortexpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
Expand All @@ -32,7 +33,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/math"
util_math "github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand All @@ -45,11 +46,17 @@ var (
// Validation errors.
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")

// Distributor instance limits errors.
errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor")
errMaxSamplesPushRateLimitReached = errors.New("distributor's samples push rate limit reached")
)

const (
typeSamples = "samples"
typeMetadata = "metadata"

instanceIngestionRateTickInterval = time.Second
)

// Distributor is a storage.SampleAppender and a client.Querier which
Expand Down Expand Up @@ -79,6 +86,9 @@ type Distributor struct {

activeUsers *util.ActiveUsersCleanupService

ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64

// Metrics
queryDuration *instrument.HistogramCollector
receivedSamples *prometheus.CounterVec
Expand Down Expand Up @@ -123,6 +133,14 @@ type Config struct {

// This config is dynamically injected because defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

// Limits for distributor
InstanceLimits InstanceLimits `yaml:"instance_limits"`
}

type InstanceLimits struct {
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -137,6 +155,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")

f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
}

// Validate config and returns error on failure
Expand All @@ -152,6 +173,12 @@ func (cfg *Config) Validate(limits validation.Limits) error {
return cfg.HATrackerConfig.Validate()
}

const (
instanceLimitsMetric = "cortex_distributor_instance_limits"
instanceLimitsMetricHelp = "Instance limits used by this distributor." // Must be same for all registrations.
limitLabel = "limit"
)

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
if cfg.IngesterClientFactory == nil {
Expand Down Expand Up @@ -200,6 +227,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -273,6 +301,31 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"}),
}

promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: instanceLimitsMetric,
Help: instanceLimitsMetricHelp,
ConstLabels: map[string]string{limitLabel: "max_inflight_push_requests"},
}).Set(float64(cfg.InstanceLimits.MaxInflightPushRequests))
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: instanceLimitsMetric,
Help: instanceLimitsMetricHelp,
ConstLabels: map[string]string{limitLabel: "max_ingestion_rate"},
}).Set(cfg.InstanceLimits.MaxIngestionRate)

promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_distributor_inflight_push_requests",
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
Help: "Current number of inflight push requests in distributor.",
}, func() float64 {
return float64(d.inflightPushRequests.Load())
})
promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_distributor_ingestion_rate_samples_per_second",
Help: "Current ingestion rate in samples/sec that distributor is using to limit access.",
}, func() float64 {
return d.ingestionRate.Rate()
})

d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
d.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(d.cleanupInactiveUser)

Expand All @@ -294,11 +347,17 @@ func (d *Distributor) starting(ctx context.Context) error {
}

func (d *Distributor) running(ctx context.Context) error {
ingestionRateTicker := time.NewTicker(instanceIngestionRateTickInterval)
defer ingestionRateTicker.Stop()

for {
select {
case <-ctx.Done():
return nil

case <-ingestionRateTicker.C:
d.ingestionRate.Tick()

case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
Expand Down Expand Up @@ -443,6 +502,20 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return nil, err
}

// We will report *this* request in the error too.
inflight := d.inflightPushRequests.Inc()
defer d.inflightPushRequests.Dec()

if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) {
return nil, errTooManyInflightPushRequests
}

if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
}
}

now := time.Now()
d.activeUsers.UpdateUserTimestamp(userID, now)

Expand Down Expand Up @@ -508,7 +581,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
for _, ts := range req.Timeseries {
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
if len(ts.Samples) > 0 {
latestSampleTimestampMs = math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
latestSampleTimestampMs = util_math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
}

if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
Expand Down Expand Up @@ -603,6 +676,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
}

// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
d.ingestionRate.Add(int64(totalN))

subRing := d.ingestersRing

// Obtain a subring if required.
Expand Down
153 changes: 153 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,155 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
}
}

func TestDistributor_PushInstanceLimits(t *testing.T) {
type testPush struct {
samples int
metadata int
expectedError error
}

tests := map[string]struct {
preInflight int
preRateSamples int // initial rate before first push
pushes []testPush // rate is recomputed after each push

// limits
inflightLimit int
ingestionRateLimit float64

metricNames []string
expectedMetrics string
}{
"no limits limit": {
preInflight: 100,
preRateSamples: 1000,

pushes: []testPush{
{samples: 100, expectedError: nil},
},

metricNames: []string{instanceLimitsMetric},
expectedMetrics: `
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
# TYPE cortex_distributor_instance_limits gauge
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
`,
},
"below inflight limit": {
preInflight: 100,
inflightLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: nil},
},

metricNames: []string{instanceLimitsMetric, "cortex_distributor_inflight_push_requests"},
expectedMetrics: `
# HELP cortex_distributor_inflight_push_requests Current number of inflight push requests in distributor.
# TYPE cortex_distributor_inflight_push_requests gauge
cortex_distributor_inflight_push_requests 100

# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
# TYPE cortex_distributor_instance_limits gauge
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 101
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
`,
},
"hits inflight limit": {
preInflight: 101,
inflightLimit: 101,
pushes: []testPush{
{samples: 100, expectedError: errTooManyInflightPushRequests},
},
},
"below ingestion rate limit": {
preRateSamples: 500,
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 1000, expectedError: nil},
},

metricNames: []string{instanceLimitsMetric, "cortex_distributor_ingestion_rate_samples_per_second"},
expectedMetrics: `
# HELP cortex_distributor_ingestion_rate_samples_per_second Current ingestion rate in samples/sec that distributor is using to limit access.
# TYPE cortex_distributor_ingestion_rate_samples_per_second gauge
cortex_distributor_ingestion_rate_samples_per_second 600

# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
# TYPE cortex_distributor_instance_limits gauge
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 1000
`,
},
"hits rate limit on first request, but second request can proceed": {
preRateSamples: 1200,
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 100, expectedError: errMaxSamplesPushRateLimitReached},
{samples: 100, expectedError: nil},
},
},

"below rate limit on first request, but hits the rate limit afterwards": {
preRateSamples: 500,
ingestionRateLimit: 1000,

pushes: []testPush{
{samples: 5000, expectedError: nil}, // after push, rate = 500 + 0.2*(5000-500) = 1400
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1400 + 0.2*(0 - 1400) = 1120
{samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1120 + 0.2*(0 - 1120) = 896
{samples: 5000, expectedError: nil}, // 896 is below 1000, so this push succeeds, new rate = 896 + 0.2*(5000-896) = 1716.8
},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// Start all expected distributors
distributors, _, r, regs := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
maxInflightRequests: testData.inflightLimit,
maxIngestionRate: testData.ingestionRateLimit,
})
defer stopAll(distributors, r)

d := distributors[0]
d.inflightPushRequests.Add(int64(testData.preInflight))
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
d.ingestionRate.Add(int64(testData.preRateSamples))

d.ingestionRate.Tick()

for _, push := range testData.pushes {
request := makeWriteRequest(0, push.samples, push.metadata)
_, err := d.Push(ctx, request)

if push.expectedError == nil {
assert.Nil(t, err)
} else {
assert.Equal(t, push.expectedError, err)
}

d.ingestionRate.Tick()

if testData.expectedMetrics != "" {
assert.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(testData.expectedMetrics), testData.metricNames...))
}
}
})
}
}

func TestDistributor_PushHAInstances(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

Expand Down Expand Up @@ -1478,6 +1627,8 @@ type prepConfig struct {
limits *validation.Limits
numDistributors int
skipLabelNameValidation bool
maxInflightRequests int
maxIngestionRate float64
}

func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *ring.Ring, []*prometheus.Registry) {
Expand Down Expand Up @@ -1558,6 +1709,8 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
distributorCfg.DistributorRing.KVStore.Mock = kvStore
distributorCfg.DistributorRing.InstanceAddr = "127.0.0.1"
distributorCfg.SkipLabelNameValidation = cfg.skipLabelNameValidation
distributorCfg.InstanceLimits.MaxInflightPushRequests = cfg.maxInflightRequests
distributorCfg.InstanceLimits.MaxIngestionRate = cfg.maxIngestionRate

if cfg.shuffleShardEnabled {
distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle
Expand Down
Loading