Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change throttle limit distribution logic #670

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions plugin/action/throttle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ It allows to distribute the `default_limit` between events by condition.
> All events for which the value in the `field` doesn't fall into any of the distributions:
> * fall into default distribution, if it exists
> * throttled, otherwise
> 3. **default distribution** can "steal" limit from other distributions after it has exhausted its.
> This is done in order to avoid reserving limits for explicitly defined distributions.

`LimitDistributionConfig` example:
```yaml
Expand All @@ -108,9 +110,10 @@ ratios:
values: ['warn', 'info']
```
For this config and the `default_limit=100`:
* events with `log.level=error` will have a limit of 50
* events with `log.level=warn` or `log.level=info` will have a limit of 30
* all other events will have a limit of 20
* events with `log.level=error` will be NO MORE than `50`
* events with `log.level=warn` or `log.level=info` will be NO MORE than `30`
* there will be AT LEAST `20` other events
(can be up to `100` if there are no events with `log.level=error/warn/info`)

<br>

Expand Down
2 changes: 1 addition & 1 deletion plugin/action/throttle/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (ld *limitDistributions) size() int {
return len(ld.distributions)
}

// get returns (index, distribution limit) by key or (-1, default distribution limit) otherwise
// getLimit returns (index, distribution limit) by key or (-1, default distribution limit) otherwise
func (ld *limitDistributions) getLimit(key string) (int, int64) {
if idx, ok := ld.idxByKey[key]; ok {
return idx, ld.distributions[idx].limit
Expand Down
89 changes: 65 additions & 24 deletions plugin/action/throttle/in_memory_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,16 @@ func (l *inMemoryLimiter) isAllowed(event *pipeline.Event, ts time.Time) bool {
l.lock()
defer l.unlock()

id := l.rebuildBuckets(ts)
index := id - l.buckets.getMinID()

// If the limit is given with distribution, then distributed buckets are used
distrIdx := 0
distrFieldVal := ""
if l.limit.distributions.isEnabled() {
distrFieldVal = event.Root.Dig(l.limit.distributions.field...).AsString()
distrIdx, limit = l.limit.distributions.getLimit(distrFieldVal)

// The distribution index in the bucket matches the distribution value index in distributions,
// but is shifted by 1 because default distribution has index 0.
distrIdx++
distrFieldVal, distrIdx, limit = l.getDistrData(index, event)
}

id := l.rebuildBuckets(ts)
index := id - l.buckets.getMinID()
switch l.limit.kind {
case "", limitKindCount:
l.buckets.add(index, distrIdx, 1)
Expand All @@ -93,28 +89,73 @@ func (l *inMemoryLimiter) isAllowed(event *pipeline.Event, ts time.Time) bool {
}

isAllowed := l.buckets.get(index, distrIdx) <= limit

if !isAllowed && l.limit.distributions.isEnabled() {
l.metricLabelsBuf = l.metricLabelsBuf[:0]

l.metricLabelsBuf = append(l.metricLabelsBuf, distrFieldVal)
for _, lbl := range l.limitDistrMetrics.CustomLabels {
val := "not_set"
node := event.Root.Dig(lbl)
if node != nil {
val = node.AsString()
}
l.metricLabelsBuf = append(l.metricLabelsBuf, val)
l.updateDistrMetrics(distrFieldVal, event)
}

return isAllowed
}

// getDistrData returns distribution field value, index and limit
func (l *inMemoryLimiter) getDistrData(bucketIdx int, event *pipeline.Event) (string, int, int64) {
fieldVal := event.Root.Dig(l.limit.distributions.field...).AsString()
idx, limit := l.limit.distributions.getLimit(fieldVal)

// The distribution index in the bucket matches the distribution value index in distributions,
// but is shifted by 1 because default distribution has index 0.
idx++

if idx > 0 {
return fieldVal, idx, limit
}

// For default distribution сheck in advance that we are within the limit.
// If not, then try to steal reserve from the most free distribution.
val := int64(1)
if l.limit.kind == limitKindSize {
val = int64(event.Size)
}

// Within the limit
if l.buckets.get(bucketIdx, idx)+val <= limit {
return fieldVal, idx, limit
}

// Looking for a distribution with the most free space.
// If found, updating idx and limit - use different bucket for check allowance.
maxDiff := int64(-1)
for i, d := range l.limit.distributions.distributions {
curVal := l.buckets.get(bucketIdx, i+1)
if curDiff := d.limit - (curVal + val); curDiff > maxDiff {
maxDiff = curDiff
idx = i + 1
limit = d.limit
}
}

return fieldVal, idx, limit
}

func (l *inMemoryLimiter) updateDistrMetrics(fieldVal string, event *pipeline.Event) {
l.metricLabelsBuf = l.metricLabelsBuf[:0]

switch l.limit.kind {
case "", limitKindCount:
l.limitDistrMetrics.EventsCount.WithLabelValues(l.metricLabelsBuf...).Inc()
case limitKindSize:
l.limitDistrMetrics.EventsSize.WithLabelValues(l.metricLabelsBuf...).Add(float64(event.Size))
l.metricLabelsBuf = append(l.metricLabelsBuf, fieldVal)
for _, lbl := range l.limitDistrMetrics.CustomLabels {
val := "not_set"
node := event.Root.Dig(lbl)
if node != nil {
val = node.AsString()
}
l.metricLabelsBuf = append(l.metricLabelsBuf, val)
}

return isAllowed
switch l.limit.kind {
case "", limitKindCount:
l.limitDistrMetrics.EventsCount.WithLabelValues(l.metricLabelsBuf...).Inc()
case limitKindSize:
l.limitDistrMetrics.EventsSize.WithLabelValues(l.metricLabelsBuf...).Add(float64(event.Size))
}
}

func (l *inMemoryLimiter) lock() {
Expand Down
9 changes: 6 additions & 3 deletions plugin/action/throttle/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type Config struct {
// >> All events for which the value in the `field` doesn't fall into any of the distributions:
// >> * fall into default distribution, if it exists
// >> * throttled, otherwise
// >> 3. **default distribution** can "steal" limit from other distributions after it has exhausted its.
// >> This is done in order to avoid reserving limits for explicitly defined distributions.
// >
// > `LimitDistributionConfig` example:
// > ```yaml
Expand All @@ -155,9 +157,10 @@ type Config struct {
// > values: ['warn', 'info']
// > ```
// > For this config and the `default_limit=100`:
// > * events with `log.level=error` will have a limit of 50
// > * events with `log.level=warn` or `log.level=info` will have a limit of 30
// > * all other events will have a limit of 20
// > * events with `log.level=error` will be NO MORE than `50`
// > * events with `log.level=warn` or `log.level=info` will be NO MORE than `30`
// > * there will be AT LEAST `20` other events
// > (can be up to `100` if there are no events with `log.level=error/warn/info`)
LimitDistribution LimitDistributionConfig `json:"limit_distribution" child:"true"` // *
}

Expand Down
48 changes: 27 additions & 21 deletions plugin/action/throttle/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,32 +581,38 @@ func TestThrottleWithDistribution(t *testing.T) {
outEvents[level]++
wg.Done()
})

// error - limit 6
// info, warn - limit 4
// default - limit 2
events := []string{
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 1/6
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`, // 1/4
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 2/6
`{"time":"%s","k8s_pod":"pod_1","level":""}`, // 1/2
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`, // 2/2
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 3/6
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 4/6
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`, // steal from "info, warn" - 2/4
`{"time":"%s","k8s_pod":"pod_1","level":"warn"}`, // 3/4
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // 5/6
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`, // 4/4
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`, // steal from "error" - 6/6
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`, // throttled
`{"time":"%s","k8s_pod":"pod_1","level":"warn"}`, // throttled
`{"time":"%s","k8s_pod":"pod_1","level":""}`, // throttled
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`, // throttled
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`, // throttled
}

wantOutEvents := map[string]int{
"error": 6,
"info": 3,
"error": 5,
"info": 2,
"warn": 1,
"debug": 1,
"debug": 3,
"": 1,
}

events := []string{
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"warn"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":""}`,
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"info"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"warn"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"debug"}`,
`{"time":"%s","k8s_pod":"pod_1","level":"error"}`,
}

nowTs := time.Now().Format(time.RFC3339Nano)
for i := 0; i < len(events); i++ {
json := fmt.Sprintf(events[i], nowTs)
Expand Down
Loading