Skip to content

Commit

Permalink
query: add partition labels flag
Browse files Browse the repository at this point in the history
The distributed engine decides when to push down certain operations by
checking if the external labels are still present, i.e. we can push down
a binary operation if its vector matching includes all external labels.
This is great but if you have multiple external labels that are
irrelevant for the partition this is problematic since query authors
must be aware of those irrelevant labels and must incorporate them into
their queries.
This PR attempts to solve that by giving an option to focus on the
labels that are relevant for the partition.

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Sep 11, 2024
1 parent 97710f4 commit 83e8df7
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 8 deletions.
4 changes: 4 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func registerQuery(app *extkingpin.App) {

queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
Strings()
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations.").Strings()

instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

Expand Down Expand Up @@ -326,6 +327,7 @@ func registerQuery(app *extkingpin.App) {
time.Duration(*storeResponseTimeout),
*queryConnMetricLabels,
*queryReplicaLabels,
*queryPartitionLabels,
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
Expand Down Expand Up @@ -407,6 +409,7 @@ func runQuery(
storeResponseTimeout time.Duration,
queryConnMetricLabels []string,
queryReplicaLabels []string,
queryPartitionLabels []string,
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
Expand Down Expand Up @@ -682,6 +685,7 @@ func runQuery(
remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
AutoDownsample: enableAutodownsampling,
ReplicaLabels: queryReplicaLabels,
PartitionLabels: queryPartitionLabels,
Timeout: queryTimeout,
EnablePartialResponse: enableQueryPartialResponse,
})
Expand Down
12 changes: 12 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,18 @@ Flags:
--query.partial-response Enable partial response for queries if
no partial_response param is specified.
--no-query.partial-response for disabling.
--query.partition-label=QUERY.PARTITION-LABEL ...
Labels that partition the leaf queriers. This
is used to scope down the labelsets of leaf
queriers when using the distributed query mode.
If set, these labels must form a partition
of the leaf queriers. Partition labels must
not intersect with replica labels. Every TSDB
of a leaf querier must have these labels.
This is useful when there are multiple external
labels that are irrelevant for the partition as
it allows the distributed engine to ignore them
for some optimizations.
--query.promql-engine=prometheus
Default PromQL engine to use.
--query.replica-label=QUERY.REPLICA-LABEL ...
Expand Down
16 changes: 13 additions & 3 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
type Opts struct {
AutoDownsample bool
ReplicaLabels []string
PartitionLabels []string
Timeout time.Duration
EnablePartialResponse bool
}
Expand Down Expand Up @@ -118,7 +119,7 @@ func (r *remoteEngine) MinT() int64 {
hashBuf = make([]byte, 0, 128)
highestMintByLabelSet = make(map[uint64]int64)
)
for _, lset := range r.infosWithoutReplicaLabels() {
for _, lset := range r.adjustedInfos() {
key, _ := labelpb.LabelpbLabelsToPromLabels(lset.Labels.Labels).HashWithoutLabels(hashBuf)
lsetMinT, ok := highestMintByLabelSet[key]
if !ok {
Expand Down Expand Up @@ -152,16 +153,22 @@ func (r *remoteEngine) MaxT() int64 {

func (r *remoteEngine) LabelSets() []labels.Labels {
r.labelSetsOnce.Do(func() {
r.labelSets = r.infosWithoutReplicaLabels().LabelSets()
r.labelSets = r.adjustedInfos().LabelSets()
})
return r.labelSets
}

func (r *remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos {
// adjustedInfos strips out replica labels and scopes the remaining labels
// onto the partition labels if they are set.
func (r *remoteEngine) adjustedInfos() infopb.TSDBInfos {
replicaLabelSet := make(map[string]struct{})
for _, lbl := range r.opts.ReplicaLabels {
replicaLabelSet[lbl] = struct{}{}
}
partitionLabelsSet := make(map[string]struct{})
for _, lbl := range r.opts.PartitionLabels {
partitionLabelsSet[lbl] = struct{}{}
}

// Strip replica labels from the result.
infos := make(infopb.TSDBInfos, 0, len(r.client.tsdbInfos))
Expand All @@ -172,6 +179,9 @@ func (r *remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos {
if _, ok := replicaLabelSet[lbl.Name]; ok {
continue
}
if _, ok := partitionLabelsSet[lbl.Name]; !ok && len(partitionLabelsSet) > 0 {
continue
}
builder.Add(lbl.Name, lbl.Value)
}
infos = append(infos, infopb.NewTSDBInfo(
Expand Down
22 changes: 17 additions & 5 deletions pkg/query/remote_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ func TestRemoteEngine_Warnings(t *testing.T) {

func TestRemoteEngine_LabelSets(t *testing.T) {
tests := []struct {
name string
tsdbInfos []*infopb.TSDBInfo
replicaLabels []string
expected []labels.Labels
name string
tsdbInfos []*infopb.TSDBInfo
replicaLabels []string
partitionLabels []string
expected []labels.Labels
}{
{
name: "empty label sets",
Expand Down Expand Up @@ -103,13 +104,24 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
replicaLabels: []string{"a", "b"},
expected: []labels.Labels{labels.FromStrings("c", "2")},
},
{
name: "non-empty label sets with partition labels",
tsdbInfos: []*infopb.TSDBInfo{
{
Labels: labelSetFromStrings("a", "1", "c", "2"),
},
},
partitionLabels: []string{"a"},
expected: []labels.Labels{labels.FromStrings("a", "1")},
},
}

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
client := NewClient(nil, "", testCase.tsdbInfos)
engine := NewRemoteEngine(log.NewNopLogger(), client, Opts{
ReplicaLabels: testCase.replicaLabels,
ReplicaLabels: testCase.replicaLabels,
PartitionLabels: testCase.partitionLabels,
})

testutil.Equals(t, testCase.expected, engine.LabelSets())
Expand Down

0 comments on commit 83e8df7

Please sign in to comment.