diff --git a/go.mod b/go.mod index 0b4e40dee56..efe525b42cf 100644 --- a/go.mod +++ b/go.mod @@ -284,7 +284,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241112081901-33bfe839e4ef +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241129155655-15150363f256 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index befa6fa570a..4f5770c35d8 100644 --- a/go.sum +++ b/go.sum @@ -1272,8 +1272,8 @@ github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzO github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20241112081901-33bfe839e4ef h1:DH1rR0nzlOIPXvLDQjmlyEtr4g+rqrN/iklns+m3w0U= -github.com/grafana/mimir-prometheus v0.0.0-20241112081901-33bfe839e4ef/go.mod h1:M4xmfU7SsnzjkLwJfvNen/MxAZp4DJPfipLzeib+0gQ= +github.com/grafana/mimir-prometheus v0.0.0-20241129155655-15150363f256 h1:6NSgI0ndAIeImaFK1Ln7J3rFyh0Cz5iy5TJOGbWDpMs= +github.com/grafana/mimir-prometheus v0.0.0-20241129155655-15150363f256/go.mod h1:M4xmfU7SsnzjkLwJfvNen/MxAZp4DJPfipLzeib+0gQ= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw= diff --git a/vendor/github.com/prometheus/prometheus/tsdb/db.go b/vendor/github.com/prometheus/prometheus/tsdb/db.go index 9f40a2c4999..d8a866e64d7 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/db.go @@ -199,6 +199,11 @@ type Options struct { // OOO Native Histogram ingestion is complete. EnableOOONativeHistograms bool + // EnableBiggerOOOBlockForOldSamples enables building 24h blocks for the OOO samples + // that belong to the previous day. This is in-line with Mimir maintaining 24h blocks + // for the previous days. + EnableBiggerOOOBlockForOldSamples bool + // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. // This can change during run-time, so this value from here should only be used // while initialising. @@ -1503,14 +1508,36 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID meta := &BlockMeta{} meta.Compaction.SetOutOfOrder() - for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize { - mint, maxt := t, t+blockSize + runCompaction := func(mint, maxt int64) error { // Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. uids, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta) if err != nil { - return nil, err + return err } ulids = append(ulids, uids...) + return nil + } + + oooStart := oooHeadMint + if db.opts.EnableBiggerOOOBlockForOldSamples { + day := 24 * time.Hour.Milliseconds() + maxtFor24hBlock := day * (db.Head().MaxTime() / day) + + // 24h blocks for data that is for the previous days + for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day { + if err := runCompaction(t, t+day); err != nil { + return nil, err + } + } + + if oooStart < maxtFor24hBlock { + oooStart = maxtFor24hBlock + } + } + for t := blockSize * (oooStart / blockSize); t <= oooHeadMaxt; t += blockSize { + if err := runCompaction(t, t+blockSize); err != nil { + return nil, err + } } if len(ulids) == 0 { diff --git a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go index 15536c69c97..38aa3fb34a6 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go @@ -32,6 +32,8 @@ import ( "github.com/prometheus/prometheus/storage" ) +const exponentialSliceGrowthFactor = 2 + var allPostingsKey = labels.Label{} // AllPostingsKey returns the label key that is used to store the postings list of all existing IDs. @@ -57,6 +59,7 @@ var ensureOrderBatchPool = sync.Pool{ type MemPostings struct { mtx sync.RWMutex m map[string]map[string][]storage.SeriesRef + lvs map[string][]string ordered bool } @@ -64,6 +67,7 @@ type MemPostings struct { func NewMemPostings() *MemPostings { return &MemPostings{ m: make(map[string]map[string][]storage.SeriesRef, 512), + lvs: make(map[string][]string, 512), ordered: true, } } @@ -73,6 +77,7 @@ func NewMemPostings() *MemPostings { func NewUnorderedMemPostings() *MemPostings { return &MemPostings{ m: make(map[string]map[string][]storage.SeriesRef, 512), + lvs: make(map[string][]string, 512), ordered: false, } } @@ -83,9 +88,9 @@ func (p *MemPostings) Symbols() StringIter { // Add all the strings to a map to de-duplicate. symbols := make(map[string]struct{}, 512) - for n, e := range p.m { + for n, lvs := range p.lvs { symbols[n] = struct{}{} - for v := range e { + for _, v := range lvs { symbols[v] = struct{}{} } } @@ -145,13 +150,14 @@ func (p *MemPostings) LabelNames() []string { // LabelValues returns label values for the given name. func (p *MemPostings) LabelValues(_ context.Context, name string) []string { p.mtx.RLock() - defer p.mtx.RUnlock() + values := p.lvs[name] + p.mtx.RUnlock() - values := make([]string, 0, len(p.m[name])) - for v := range p.m[name] { - values = append(values, v) - } - return values + // The slice from p.lvs[name] is shared between all readers, and it is append-only. + // Since it's shared, we need to make a copy of it before returning it to make + // sure that no caller modifies the original one by sorting it or filtering it. + // Since it's append-only, we can do this while not holding the mutex anymore. + return slices.Clone(values) } // PostingsStats contains cardinality based statistics for postings. @@ -294,6 +300,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.mtx.Lock() defer p.mtx.Unlock() + affectedLabelNames := map[string]struct{}{} process := func(l labels.Label) { orig := p.m[l.Name][l.Value] repl := make([]storage.SeriesRef, 0, len(orig)) @@ -306,10 +313,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.m[l.Name][l.Value] = repl } else { delete(p.m[l.Name], l.Value) - // Delete the key if we removed all values. - if len(p.m[l.Name]) == 0 { - delete(p.m, l.Name) - } + affectedLabelNames[l.Name] = struct{}{} } } @@ -339,6 +343,35 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma } } process(allPostingsKey) + + // Now we need to update the label values slices. + i = 0 + for name := range affectedLabelNames { + i++ + // Same mutex pause as above. + if i%512 == 0 { + p.mtx.Unlock() + p.mtx.RLock() + p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section. + time.Sleep(time.Millisecond) + p.mtx.Lock() + } + + if len(p.m[name]) == 0 { + // Delete the label name key if we deleted all values. + delete(p.m, name) + delete(p.lvs, name) + continue + } + + // Create the new slice with enough room to grow without reallocating. + // We have deleted values here, so there's definitely some churn, so be prepared for it. + lvs := make([]string, 0, exponentialSliceGrowthFactor*len(p.m[name])) + for v := range p.m[name] { + lvs = append(lvs, v) + } + p.lvs[name] = lvs + } } // Iter calls f for each postings list. It aborts if f returns an error and returns it. @@ -370,7 +403,7 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) { func appendWithExponentialGrowth[T any](a []T, v T) []T { if cap(a) < len(a)+1 { - newList := make([]T, len(a), len(a)*2+1) + newList := make([]T, len(a), len(a)*exponentialSliceGrowthFactor+1) copy(newList, a) a = newList } @@ -383,7 +416,11 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { nm = map[string][]storage.SeriesRef{} p.m[l.Name] = nm } - list := appendWithExponentialGrowth(nm[l.Value], id) + vm, ok := nm[l.Value] + if !ok { + p.lvs[l.Name] = appendWithExponentialGrowth(p.lvs[l.Name], l.Value) + } + list := appendWithExponentialGrowth(vm, id) nm[l.Value] = list if !p.ordered { @@ -402,25 +439,27 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { } func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { - // We'll copy the values into a slice and then match over that, + // We'll take the label values slice and then match over that, // this way we don't need to hold the mutex while we're matching, // which can be slow (seconds) if the match function is a huge regex. // Holding this lock prevents new series from being added (slows down the write path) // and blocks the compaction process. - vals := p.labelValues(name) - for i, count := 0, 1; i < len(vals); count++ { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { + // + // We just need to make sure we don't modify the slice we took, + // so we'll append matching values to a different one. + p.mtx.RLock() + readOnlyLabelValues := p.lvs[name] + p.mtx.RUnlock() + + vals := make([]string, 0, len(readOnlyLabelValues)) + for i, v := range readOnlyLabelValues { + if i%checkContextEveryNIterations == 0 && ctx.Err() != nil { return ErrPostings(ctx.Err()) } - if match(vals[i]) { - i++ - continue + if match(v) { + vals = append(vals, v) } - - // Didn't match, bring the last value to this position, make the slice shorter and check again. - // The order of the slice doesn't matter as it comes from a map iteration. - vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] } // If none matched (or this label had no values), no need to grab the lock again. @@ -447,27 +486,6 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, return Merge(ctx, its...) } -// labelValues returns a slice of label values for the given label name. -// It will take the read lock. -func (p *MemPostings) labelValues(name string) []string { - p.mtx.RLock() - defer p.mtx.RUnlock() - - e := p.m[name] - if len(e) == 0 { - return nil - } - - vals := make([]string, 0, len(e)) - for v, srs := range e { - if len(srs) > 0 { - vals = append(vals, v) - } - } - - return vals -} - // ExpandPostings returns the postings expanded as a slice. func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) { for p.Next() { diff --git a/vendor/modules.txt b/vendor/modules.txt index c37c186f33c..0df35fb4d3d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1013,7 +1013,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20241112081901-33bfe839e4ef +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20241129155655-15150363f256 ## explicit; go 1.22.0 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1677,7 +1677,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241112081901-33bfe839e4ef +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241129155655-15150363f256 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240531075221-3685f1377d7b