-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
shard_info.go
112 lines (91 loc) · 2.31 KB
/
shard_info.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package storepb
import (
"sync"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)
var sep = []byte{'\xff'}
type ShardMatcher struct {
buf *[]byte
buffers *sync.Pool
shardingLabelset map[string]struct{}
isSharded bool
by bool
totalShards int64
shardIndex int64
}
func (s *ShardMatcher) IsSharded() bool {
return s.isSharded
}
func (s *ShardMatcher) Close() {
if s.buffers != nil {
s.buffers.Put(s.buf)
}
}
func (s *ShardMatcher) MatchesZLabels(zLabels []labelpb.ZLabel) bool {
// Match all series when query is not sharded
if s == nil || !s.isSharded {
return true
}
*s.buf = (*s.buf)[:0]
for _, lbl := range zLabels {
// Exclude metric name and le label from sharding
if lbl.Name == "__name__" || lbl.Name == "le" {
continue
}
if shardByLabel(s.shardingLabelset, lbl, s.by) {
*s.buf = append(*s.buf, lbl.Name...)
*s.buf = append(*s.buf, sep[0])
*s.buf = append(*s.buf, lbl.Value...)
*s.buf = append(*s.buf, sep[0])
}
}
hash := xxhash.Sum64(*s.buf)
return hash%uint64(s.totalShards) == uint64(s.shardIndex)
}
func (s *ShardMatcher) MatchesLabels(lbls labels.Labels) bool {
return s.MatchesZLabels(labelpb.ZLabelsFromPromLabels(lbls))
}
func shardByLabel(labelSet map[string]struct{}, zlabel labelpb.ZLabel, groupingBy bool) bool {
_, shardHasLabel := labelSet[zlabel.Name]
if groupingBy && shardHasLabel {
return true
}
groupingWithout := !groupingBy
if groupingWithout && !shardHasLabel {
return true
}
return false
}
func (m *ShardInfo) Matcher(buffers *sync.Pool) *ShardMatcher {
if m == nil || m.TotalShards < 1 {
return &ShardMatcher{
isSharded: false,
}
}
return &ShardMatcher{
isSharded: true,
buf: buffers.Get().(*[]byte),
buffers: buffers,
shardingLabelset: m.labelSet(),
by: m.By,
totalShards: m.TotalShards,
shardIndex: m.ShardIndex,
}
}
func (m *ShardInfo) labelSet() map[string]struct{} {
if m == nil {
return nil
}
labelSet := make(map[string]struct{})
if m == nil || m.Labels == nil {
return labelSet
}
for _, label := range m.Labels {
labelSet[label] = struct{}{}
}
return labelSet
}