Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the number of workers processing federated query configurable #6449

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ tenant_federation:
# CLI flag: -tenant-federation.enabled
[enabled: <boolean> | default = false]

# The number of workers used to process each federated query.
# CLI flag: -tenant-federation.max-concurrent
[max_concurrent: <int> | default = 16]

# The ruler_config configures the Cortex ruler.
[ruler: <ruler_config>]

Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
// single tenant. This allows for a less impactful enabling of tenant
// federation.
byPassForSingleQuerier := true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier))
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
}
return nil, nil
}
Expand Down
44 changes: 33 additions & 11 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
Expand All @@ -19,9 +21,9 @@ import (
)

const (
defaultTenantLabel = "__tenant_id__"
retainExistingPrefix = "original_"
maxConcurrency = 16
defaultTenantLabel = "__tenant_id__"
retainExistingPrefix = "original_"
defaultMaxConcurrency = 16
)

// NewQueryable returns a queryable that iterates through all the tenant IDs
Expand All @@ -36,8 +38,8 @@ const (
// If the label "__tenant_id__" is already existing, its value is overwritten
// by the tenant ID and the previous value is exposed through a new label
// prefixed with "original_". This behaviour is not implemented recursively.
func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool) storage.Queryable {
return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier)
func NewQueryable(upstream storage.Queryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
return NewMergeQueryable(defaultTenantLabel, maxConcurrent, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg)
}

func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback {
Expand Down Expand Up @@ -79,29 +81,41 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids
// If the label `idLabelName` is already existing, its value is overwritten and
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively.
func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool) storage.Queryable {
func NewMergeQueryable(idLabelName string, maxConcurrent int, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
return &mergeQueryable{
idLabelName: idLabelName,
maxConcurrent: maxConcurrent,
callback: callback,
byPassWithSingleQuerier: byPassWithSingleQuerier,

tenantsPerQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "querier_federated_tenants_per_query",
Help: "Number of tenants per query.",
Buckets: []float64{1, 2, 4, 8, 16, 32, 64},
}),
}
}

type mergeQueryable struct {
idLabelName string
maxConcurrent int
byPassWithSingleQuerier bool
callback MergeQuerierCallback
tenantsPerQuery prometheus.Histogram
}

// Querier returns a new mergeQuerier, which aggregates results from multiple
// underlying queriers into a single result.
func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error) {
return &mergeQuerier{
idLabelName: m.idLabelName,
maxConcurrent: m.maxConcurrent,
mint: mint,
maxt: maxt,
byPassWithSingleQuerier: m.byPassWithSingleQuerier,
callback: m.callback,
tenantsPerQuery: m.tenantsPerQuery,
}, nil
}

Expand All @@ -112,11 +126,13 @@ func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively
type mergeQuerier struct {
idLabelName string
mint, maxt int64
callback MergeQuerierCallback
idLabelName string
mint, maxt int64
callback MergeQuerierCallback
maxConcurrent int

byPassWithSingleQuerier bool
tenantsPerQuery prometheus.Histogram
}

// LabelValues returns all potential values for a label name. It is not safe
Expand All @@ -130,6 +146,8 @@ func (m *mergeQuerier) LabelValues(ctx context.Context, name string, hints *stor
return nil, nil, err
}

m.tenantsPerQuery.Observe(float64(len(ids)))

// by pass when only single querier is returned
if m.byPassWithSingleQuerier && len(queriers) == 1 {
return queriers[0].LabelValues(ctx, name, hints, matchers...)
Expand Down Expand Up @@ -169,6 +187,8 @@ func (m *mergeQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints
return nil, nil, err
}

m.tenantsPerQuery.Observe(float64(len(ids)))

// by pass when only single querier is returned
if m.byPassWithSingleQuerier && len(queriers) == 1 {
return queriers[0].LabelNames(ctx, hints, matchers...)
Expand Down Expand Up @@ -257,7 +277,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(ctx context.Context,
return nil
}

err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -309,6 +329,8 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
return storage.ErrSeriesSet(err)
}

m.tenantsPerQuery.Observe(float64(len(ids)))

// by pass when only single querier is returned
if m.byPassWithSingleQuerier && len(queriers) == 1 {
return queriers[0].Select(ctx, sortSeries, hints, matchers...)
Expand Down Expand Up @@ -352,7 +374,7 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
return nil
}

if err := concurrency.ForEach(ctx, jobs, maxConcurrency, run); err != nil {
if err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run); err != nil {
return storage.ErrSeriesSet(err)
}

Expand Down
Loading
Loading