Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add partition labels flag #7722

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func registerQuery(app *extkingpin.App) {

queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
Strings()
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations. If this is empty then all labels are used as partition labels.").Strings()

instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

Expand Down Expand Up @@ -326,6 +327,7 @@ func registerQuery(app *extkingpin.App) {
time.Duration(*storeResponseTimeout),
*queryConnMetricLabels,
*queryReplicaLabels,
*queryPartitionLabels,
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
Expand Down Expand Up @@ -407,6 +409,7 @@ func runQuery(
storeResponseTimeout time.Duration,
queryConnMetricLabels []string,
queryReplicaLabels []string,
queryPartitionLabels []string,
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
Expand Down Expand Up @@ -682,6 +685,7 @@ func runQuery(
remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
AutoDownsample: enableAutodownsampling,
ReplicaLabels: queryReplicaLabels,
PartitionLabels: queryPartitionLabels,
Timeout: queryTimeout,
EnablePartialResponse: enableQueryPartialResponse,
})
Expand Down
13 changes: 13 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,19 @@ Flags:
--query.partial-response Enable partial response for queries if
no partial_response param is specified.
--no-query.partial-response for disabling.
--query.partition-label=QUERY.PARTITION-LABEL ...
Labels that partition the leaf queriers. This
is used to scope down the labelsets of leaf
queriers when using the distributed query mode.
If set, these labels must form a partition
of the leaf queriers. Partition labels must
not intersect with replica labels. Every TSDB
of a leaf querier must have these labels.
This is useful when there are multiple external
labels that are irrelevant for the partition as
it allows the distributed engine to ignore them
for some optimizations. If this is empty then
all labels are used as partition labels.
--query.promql-engine=prometheus
Default PromQL engine to use.
--query.replica-label=QUERY.REPLICA-LABEL ...
Expand Down
16 changes: 13 additions & 3 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
type Opts struct {
AutoDownsample bool
ReplicaLabels []string
PartitionLabels []string
Timeout time.Duration
EnablePartialResponse bool
}
Expand Down Expand Up @@ -118,7 +119,7 @@ func (r *remoteEngine) MinT() int64 {
hashBuf = make([]byte, 0, 128)
highestMintByLabelSet = make(map[uint64]int64)
)
for _, lset := range r.infosWithoutReplicaLabels() {
for _, lset := range r.adjustedInfos() {
key, _ := labelpb.LabelpbLabelsToPromLabels(lset.Labels.Labels).HashWithoutLabels(hashBuf)
lsetMinT, ok := highestMintByLabelSet[key]
if !ok {
Expand Down Expand Up @@ -152,16 +153,22 @@ func (r *remoteEngine) MaxT() int64 {

func (r *remoteEngine) LabelSets() []labels.Labels {
r.labelSetsOnce.Do(func() {
r.labelSets = r.infosWithoutReplicaLabels().LabelSets()
r.labelSets = r.adjustedInfos().LabelSets()
})
return r.labelSets
}

func (r *remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos {
// adjustedInfos strips out replica labels and scopes the remaining labels
// onto the partition labels if they are set.
func (r *remoteEngine) adjustedInfos() infopb.TSDBInfos {
replicaLabelSet := make(map[string]struct{})
for _, lbl := range r.opts.ReplicaLabels {
replicaLabelSet[lbl] = struct{}{}
}
partitionLabelsSet := make(map[string]struct{})
for _, lbl := range r.opts.PartitionLabels {
partitionLabelsSet[lbl] = struct{}{}
}

// Strip replica labels from the result.
infos := make(infopb.TSDBInfos, 0, len(r.client.tsdbInfos))
Expand All @@ -172,6 +179,9 @@ func (r *remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos {
if _, ok := replicaLabelSet[lbl.Name]; ok {
continue
}
if _, ok := partitionLabelsSet[lbl.Name]; !ok && len(partitionLabelsSet) > 0 {
continue
}
builder.Add(lbl.Name, lbl.Value)
}
infos = append(infos, infopb.NewTSDBInfo(
Expand Down
22 changes: 17 additions & 5 deletions pkg/query/remote_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ func TestRemoteEngine_Warnings(t *testing.T) {

func TestRemoteEngine_LabelSets(t *testing.T) {
tests := []struct {
name string
tsdbInfos []*infopb.TSDBInfo
replicaLabels []string
expected []labels.Labels
name string
tsdbInfos []*infopb.TSDBInfo
replicaLabels []string
partitionLabels []string
expected []labels.Labels
}{
{
name: "empty label sets",
Expand Down Expand Up @@ -103,13 +104,24 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
replicaLabels: []string{"a", "b"},
expected: []labels.Labels{labels.FromStrings("c", "2")},
},
{
name: "non-empty label sets with partition labels",
tsdbInfos: []*infopb.TSDBInfo{
{
Labels: labelSetFromStrings("a", "1", "c", "2"),
},
},
partitionLabels: []string{"a"},
expected: []labels.Labels{labels.FromStrings("a", "1")},
},
}

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
client := NewClient(nil, "", testCase.tsdbInfos)
engine := NewRemoteEngine(log.NewNopLogger(), client, Opts{
ReplicaLabels: testCase.replicaLabels,
ReplicaLabels: testCase.replicaLabels,
PartitionLabels: testCase.partitionLabels,
})

testutil.Equals(t, testCase.expected, engine.LabelSets())
Expand Down
Loading