Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch adding series to query limiter to optimize locking #5505

Merged
merged 2 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476
* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481
* [ENHANCEMENT] Compactor: allow unregisteronshutdown to be configurable. #5503
* [ENHANCEMENT] Querier: Batch adding series to query limiter to optimize locking. #5505
* [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532
* [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517
* [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542
Expand Down
20 changes: 10 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,17 +1093,18 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, validation.LimitError(err.Error())
}
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
for _, m := range resp.Metric {
if err := queryLimiter.AddSeries(m.Labels); err != nil {
return nil, validation.LimitError(err.Error())
}
s = append(s, m.Labels)
m := cortexpb.FromLabelAdaptersToMetric(m.Labels)
fingerprint := m.Fingerprint()
mutex.Lock()
(*metrics)[fingerprint] = m
mutex.Unlock()
}

if err := queryLimiter.AddSeries(s...); err != nil {
return nil, validation.LimitError(err.Error())
}
return nil, nil
})

Expand All @@ -1130,19 +1131,18 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
} else if err != nil {
return nil, err
}

s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
for _, metric := range resp.Metric {
m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels)

if err := queryLimiter.AddSeries(metric.Labels); err != nil {
return nil, validation.LimitError(err.Error())
}

s = append(s, metric.Labels)
fingerprint := m.Fingerprint()
mutex.Lock()
(*metrics)[fingerprint] = m
mutex.Unlock()
}
if err := queryLimiter.AddSeries(s...); err != nil {
return nil, validation.LimitError(err.Error())
}
}

return nil, nil
Expand Down
19 changes: 10 additions & 9 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, validation.LimitError(chunkLimitErr.Error())
}

s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)+len(resp.Timeseries))
for _, series := range resp.Chunkseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}
s = append(s, series.Labels)
}

for _, series := range resp.Timeseries {
s = append(s, series.Labels)
}

if limitErr := queryLimiter.AddSeries(s...); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}

if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil {
Expand All @@ -340,12 +347,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, validation.LimitError(dataBytesLimitErr.Error())
}

for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}
}

result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/util/limiter/query_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,23 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
return ql
}

// AddSeries adds the input series and returns an error if the limit is reached.
func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error {
// AddSeriesBatch adds the batch of input series and returns an error if the limit is reached.
func (ql *QueryLimiter) AddSeries(series ...[]cortexpb.LabelAdapter) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small nit:

Can we receive an [][]cortexpb.LabelAdapter instaed of ...[]cortexpb.LabelAdapter to avoid copying this slice over?

From go doc:

If f is variadic with a final parameter p of type ...T, then within f the type of p is equivalent to type []T. If f is invoked with no actual arguments for p, the value passed to p is nil. Otherwise, the value passed is a new slice of type []T with a new underlying array whose successive elements are the actual arguments, which all must be assignable to T. The length and capacity of the slice is therefore the number of arguments bound to p and may differ for each call site.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong. The doc is saying that if we pass a slice into a variadic function, it'll not create a new array right?

If the final argument is assignable to a slice type []T and is followed by ..., it is passed unchanged as the value for a ...T parameter. In this case no new slice is created.

series := [][]cortexpb.LabelAdapter{s1, s2}
AddSeries(series...) // This will not create a new underlying array.

// If the max series is unlimited just return without managing map
if ql.maxSeriesPerQuery == 0 {
return nil
}
fingerprint := client.FastFingerprint(seriesLabels)
fps := make([]model.Fingerprint, 0, len(series))
for _, s := range series {
fps = append(fps, client.FastFingerprint(s))
}

ql.uniqueSeriesMx.Lock()
defer ql.uniqueSeriesMx.Unlock()
for _, fp := range fps {
ql.uniqueSeries[fp] = struct{}{}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay with this but I am also wondering if we can calculate the hashes first before we lock.
This requires to allocate a slice of int64 so not sure if it is worth. Probably we need some benchmark here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did see some improvement when hashing the series outside the lock. I'll implement your suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harry671003 Can you run the benchmark again? And let's also add -benchmem to show memory allocation?

Copy link
Contributor Author

@harry671003 harry671003 Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# old.out - Without the slice of fps
# new.out - With the slice of fps

> benchstat old.out new.out                                                                                                                              
goos: darwin
goarch: arm64
pkg: github.com/cortexproject/cortex/pkg/util/limiter
                               │   old.out   │               new.out               │
                               │   sec/op    │   sec/op     vs base                │
QueryLimiter_AddSeriesBatch-10   12.97µ ± 7%   11.39µ ± 3%  -12.20% (p=0.000 n=10)

                               │   old.out    │               new.out               │
                               │     B/op     │     B/op      vs base               │
QueryLimiter_AddSeriesBatch-10   30.20Ki ± 0%   30.98Ki ± 0%  +2.59% (p=0.000 n=10)

                               │  old.out   │              new.out              │
                               │ allocs/op  │ allocs/op   vs base               │
QueryLimiter_AddSeriesBatch-10   400.0 ± 0%   401.0 ± 0%  +0.25% (p=0.000 n=10)


ql.uniqueSeries[fingerprint] = struct{}{}
if len(ql.uniqueSeries) > ql.maxSeriesPerQuery {
// Format error with max limit
return fmt.Errorf(ErrMaxSeriesHit, ql.maxSeriesPerQuery)
Expand Down
88 changes: 76 additions & 12 deletions pkg/util/limiter/query_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package limiter

import (
"fmt"
"sync"
"testing"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -87,6 +88,37 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T)
require.Error(t, err)
}

func TestQueryLimiter_AddSeriesBatch_ShouldReturnErrorOnLimitExceeded(t *testing.T) {
const (
metricName = "test_metric"
)

limiter := NewQueryLimiter(10, 0, 0, 0)
series := make([][]cortexpb.LabelAdapter, 0, 10)

for i := 0; i < 10; i++ {
s := []cortexpb.LabelAdapter{
{
Name: labels.MetricName,
Value: fmt.Sprintf("%v_%v", metricName, i),
},
}
series = append(series, s)
}
err := limiter.AddSeries(series...)
require.NoError(t, err)

series1 := []cortexpb.LabelAdapter{
{
Name: labels.MetricName,
Value: metricName + "_11",
},
}

err = limiter.AddSeries(series1)
require.Error(t, err)
}

func TestQueryLimiter_AddChunkBytes(t *testing.T) {
var limiter = NewQueryLimiter(0, 100, 0, 0)

Expand All @@ -106,23 +138,55 @@ func TestQueryLimiter_AddDataBytes(t *testing.T) {
}

func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
AddSeriesConcurrentBench(b, 1)
}

func BenchmarkQueryLimiter_AddSeriesBatch(b *testing.B) {
AddSeriesConcurrentBench(b, 128)
}

func AddSeriesConcurrentBench(b *testing.B, batchSize int) {
b.ResetTimer()
const (
metricName = "test_metric"
)
var series []labels.Labels
for i := 0; i < b.N; i++ {
series = append(series,
labels.FromMap(map[string]string{
labels.MetricName: metricName + "_1",
"series1": fmt.Sprint(i),
}))
}
b.ResetTimer()

limiter := NewQueryLimiter(b.N+1, 0, 0, 0)
for _, s := range series {
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s))
assert.NoError(b, err)

// Concurrent goroutines trying to add duplicated series
const numWorkers = 100
var wg sync.WaitGroup

worker := func(w int) {
defer wg.Done()
var series []labels.Labels
for i := 0; i < b.N; i++ {
series = append(series,
labels.FromMap(map[string]string{
labels.MetricName: metricName + "_1",
"series1": fmt.Sprint(i),
}))
}

for i := 0; i < len(series); i += batchSize {
s := make([][]cortexpb.LabelAdapter, 0, batchSize)
j := i + batchSize
if j > len(series) {
j = len(series)
}
for k := i; k < j; k++ {
s = append(s, cortexpb.FromLabelsToLabelAdapters(series[k]))
}

err := limiter.AddSeries(s...)
assert.NoError(b, err)
}
}

for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w)
}

wg.Wait()
}
Loading