Skip to content

Commit

Permalink
Count the number of rate-limited samples in distributor_samples_in_to…
Browse files Browse the repository at this point in the history
…tal (#5714)

* Count the number of rate-limited samples in distributor_samples_in_total

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Update changelog

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

---------

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
  • Loading branch information
damnever authored Jan 30, 2024
1 parent 86af904 commit 7d79cf0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
* [CHANGE] Ingester: Disable uploading compacted blocks and overlapping compaction in ingester. #5735
* [CHANGE] Distributor: Count the number of rate-limited samples in `distributor_samples_in_total`. #5714
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
Expand Down
23 changes: 11 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,21 +583,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
inflight := d.inflightPushRequests.Inc()
defer d.inflightPushRequests.Dec()

if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) {
return nil, errTooManyInflightPushRequests
}

if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
}
}

now := time.Now()
d.activeUsers.UpdateUserTimestamp(userID, now)

removeReplica := false

numSamples := 0
numExemplars := 0
for _, ts := range req.Timeseries {
Expand All @@ -610,6 +598,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
// Count the total number of metadata in.
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))

if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) {
return nil, errTooManyInflightPushRequests
}

if d.cfg.InstanceLimits.MaxIngestionRate > 0 {
if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate {
return nil, errMaxSamplesPushRateLimitReached
}
}

removeReplica := false
// Cache user limit with overrides so we spend less CPU doing locking. See issue #4904
limits := d.limits.GetOverridesForUser(userID)

Expand Down

0 comments on commit 7d79cf0

Please sign in to comment.