diff --git a/go.mod b/go.mod index b9b97bba4a3..a443235ef24 100644 --- a/go.mod +++ b/go.mod @@ -284,7 +284,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241120160701-db938c3ceac8 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241129155652-bcbf77399208 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index 5e1a8148ceb..88e1b58d0d5 100644 --- a/go.sum +++ b/go.sum @@ -1276,8 +1276,8 @@ github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzO github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20241120160701-db938c3ceac8 h1:y9kz0U/FgKalDnzS+2TbTdKytdvZrzqqX4eh3I26vZA= -github.com/grafana/mimir-prometheus v0.0.0-20241120160701-db938c3ceac8/go.mod h1:M4xmfU7SsnzjkLwJfvNen/MxAZp4DJPfipLzeib+0gQ= +github.com/grafana/mimir-prometheus v0.0.0-20241129155652-bcbf77399208 h1:DxbORcpCvHpnGkt29R6LnxN0o4NDknBOlumUiNEhQoE= +github.com/grafana/mimir-prometheus v0.0.0-20241129155652-bcbf77399208/go.mod h1:M4xmfU7SsnzjkLwJfvNen/MxAZp4DJPfipLzeib+0gQ= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw= diff --git a/vendor/github.com/prometheus/prometheus/tsdb/db.go b/vendor/github.com/prometheus/prometheus/tsdb/db.go index 9f40a2c4999..d8a866e64d7 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/db.go @@ -199,6 +199,11 @@ type Options struct { // OOO Native Histogram ingestion is complete. EnableOOONativeHistograms bool + // EnableBiggerOOOBlockForOldSamples enables building 24h blocks for the OOO samples + // that belong to the previous day. This is in-line with Mimir maintaining 24h blocks + // for the previous days. + EnableBiggerOOOBlockForOldSamples bool + // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. // This can change during run-time, so this value from here should only be used // while initialising. @@ -1503,14 +1508,36 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID meta := &BlockMeta{} meta.Compaction.SetOutOfOrder() - for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize { - mint, maxt := t, t+blockSize + runCompaction := func(mint, maxt int64) error { // Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. uids, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta) if err != nil { - return nil, err + return err } ulids = append(ulids, uids...) + return nil + } + + oooStart := oooHeadMint + if db.opts.EnableBiggerOOOBlockForOldSamples { + day := 24 * time.Hour.Milliseconds() + maxtFor24hBlock := day * (db.Head().MaxTime() / day) + + // 24h blocks for data that is for the previous days + for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day { + if err := runCompaction(t, t+day); err != nil { + return nil, err + } + } + + if oooStart < maxtFor24hBlock { + oooStart = maxtFor24hBlock + } + } + for t := blockSize * (oooStart / blockSize); t <= oooHeadMaxt; t += blockSize { + if err := runCompaction(t, t+blockSize); err != nil { + return nil, err + } } if len(ulids) == 0 { diff --git a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go index a9871325130..a2aafbaa0c4 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go @@ -18,6 +18,7 @@ import ( "context" "encoding/binary" "fmt" + "maps" "math" "runtime" "slices" @@ -32,6 +33,8 @@ import ( "github.com/prometheus/prometheus/storage" ) +const exponentialSliceGrowthFactor = 2 + var allPostingsKey = labels.Label{} // AllPostingsKey returns the label key that is used to store the postings list of all existing IDs. @@ -55,15 +58,33 @@ var ensureOrderBatchPool = sync.Pool{ // EnsureOrder() must be called once before any reads are done. This allows for quick // unordered batch fills on startup. type MemPostings struct { - mtx sync.RWMutex - m map[string]map[string][]storage.SeriesRef + mtx sync.RWMutex + + // m holds the postings lists for each label-value pair, indexed first by label name, and then by label value. + // + // mtx must be held when interacting with m (the appropriate one for reading or writing). + // It is safe to retain a reference to a postings list after releasing the lock. + // + // BUG: There's currently a data race in addFor, which might modify the tail of the postings list: + // https://github.com/prometheus/prometheus/issues/15317 + m map[string]map[string][]storage.SeriesRef + + // lvs holds the label values for each label name. + // lvs[name] is essentially an unsorted append-only list of all keys in m[name] + // mtx must be held when interacting with lvs. + // Since it's append-only, it is safe to the label values slice after releasing the lock. + lvs map[string][]string + ordered bool } +const defaultLabelNamesMapSize = 512 + // NewMemPostings returns a memPostings that's ready for reads and writes. func NewMemPostings() *MemPostings { return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), + m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize), + lvs: make(map[string][]string, defaultLabelNamesMapSize), ordered: true, } } @@ -72,7 +93,8 @@ func NewMemPostings() *MemPostings { // until EnsureOrder() was called once. func NewUnorderedMemPostings() *MemPostings { return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), + m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize), + lvs: make(map[string][]string, defaultLabelNamesMapSize), ordered: false, } } @@ -80,16 +102,19 @@ func NewUnorderedMemPostings() *MemPostings { // Symbols returns an iterator over all unique name and value strings, in order. func (p *MemPostings) Symbols() StringIter { p.mtx.RLock() + // Make a quick clone of the map to avoid holding the lock while iterating. + // It's safe to use the values of the map after releasing the lock, as they're append-only slices. + lvs := maps.Clone(p.lvs) + p.mtx.RUnlock() // Add all the strings to a map to de-duplicate. - symbols := make(map[string]struct{}, 512) - for n, e := range p.m { + symbols := make(map[string]struct{}, defaultLabelNamesMapSize) + for n, labelValues := range lvs { symbols[n] = struct{}{} - for v := range e { + for _, v := range labelValues { symbols[v] = struct{}{} } } - p.mtx.RUnlock() res := make([]string, 0, len(symbols)) for k := range symbols { @@ -145,13 +170,14 @@ func (p *MemPostings) LabelNames() []string { // LabelValues returns label values for the given name. func (p *MemPostings) LabelValues(_ context.Context, name string) []string { p.mtx.RLock() - defer p.mtx.RUnlock() + values := p.lvs[name] + p.mtx.RUnlock() - values := make([]string, 0, len(p.m[name])) - for v := range p.m[name] { - values = append(values, v) - } - return values + // The slice from p.lvs[name] is shared between all readers, and it is append-only. + // Since it's shared, we need to make a copy of it before returning it to make + // sure that no caller modifies the original one by sorting it or filtering it. + // Since it's append-only, we can do this while not holding the mutex anymore. + return slices.Clone(values) } // PostingsStats contains cardinality based statistics for postings. @@ -294,6 +320,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.mtx.Lock() defer p.mtx.Unlock() + affectedLabelNames := map[string]struct{}{} process := func(l labels.Label) { orig := p.m[l.Name][l.Value] repl := make([]storage.SeriesRef, 0, len(orig)) @@ -306,10 +333,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.m[l.Name][l.Value] = repl } else { delete(p.m[l.Name], l.Value) - // Delete the key if we removed all values. - if len(p.m[l.Name]) == 0 { - delete(p.m, l.Name) - } + affectedLabelNames[l.Name] = struct{}{} } } @@ -323,22 +347,52 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma // Note that a read query will most likely want to read multiple postings lists, say 5, 10 or 20 (depending on the number of matchers) // And that read query will most likely evaluate only one of those matchers before we unpause here, so we want to pause often. if i%512 == 0 { - p.mtx.Unlock() - // While it's tempting to just do a `time.Sleep(time.Millisecond)` here, - // it wouldn't ensure use that readers actually were able to get the read lock, - // because if there are writes waiting on same mutex, readers won't be able to get it. - // So we just grab one RLock ourselves. - p.mtx.RLock() - // We shouldn't wait here, because we would be blocking a potential write for no reason. - // Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock. - p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section. - // Now we can wait a little bit just to increase the chance of a reader getting the lock. - // If we were deleting 100M series here, pausing every 512 with 1ms sleeps would be an extra of 200s, which is negligible. - time.Sleep(time.Millisecond) - p.mtx.Lock() + p.unlockWaitAndLockAgain() } } process(allPostingsKey) + + // Now we need to update the label values slices. + i = 0 + for name := range affectedLabelNames { + i++ + // From time to time we want some readers to go through and read their postings. + if i%512 == 0 { + p.unlockWaitAndLockAgain() + } + + if len(p.m[name]) == 0 { + // Delete the label name key if we deleted all values. + delete(p.m, name) + delete(p.lvs, name) + continue + } + + // Create the new slice with enough room to grow without reallocating. + // We have deleted values here, so there's definitely some churn, so be prepared for it. + lvs := make([]string, 0, exponentialSliceGrowthFactor*len(p.m[name])) + for v := range p.m[name] { + lvs = append(lvs, v) + } + p.lvs[name] = lvs + } +} + +// unlockWaitAndLockAgain will unlock an already locked p.mtx.Lock() and then wait a little bit before locking it again, +// letting the RLock()-waiting goroutines to get the lock. +func (p *MemPostings) unlockWaitAndLockAgain() { + p.mtx.Unlock() + // While it's tempting to just do a `time.Sleep(time.Millisecond)` here, + // it wouldn't ensure use that readers actually were able to get the read lock, + // because if there are writes waiting on same mutex, readers won't be able to get it. + // So we just grab one RLock ourselves. + p.mtx.RLock() + // We shouldn't wait here, because we would be blocking a potential write for no reason. + // Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock. + p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section. + // Now we can wait a little bit just to increase the chance of a reader getting the lock. + time.Sleep(time.Millisecond) + p.mtx.Lock() } // Iter calls f for each postings list. It aborts if f returns an error and returns it. @@ -370,7 +424,7 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) { func appendWithExponentialGrowth[T any](a []T, v T) []T { if cap(a) < len(a)+1 { - newList := make([]T, len(a), len(a)*2+1) + newList := make([]T, len(a), len(a)*exponentialSliceGrowthFactor+1) copy(newList, a) a = newList } @@ -383,7 +437,11 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { nm = map[string][]storage.SeriesRef{} p.m[l.Name] = nm } - list := appendWithExponentialGrowth(nm[l.Value], id) + vm, ok := nm[l.Value] + if !ok { + p.lvs[l.Name] = appendWithExponentialGrowth(p.lvs[l.Name], l.Value) + } + list := appendWithExponentialGrowth(vm, id) nm[l.Value] = list if !p.ordered { @@ -402,25 +460,27 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { } func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { - // We'll copy the values into a slice and then match over that, + // We'll take the label values slice and then match over that, // this way we don't need to hold the mutex while we're matching, // which can be slow (seconds) if the match function is a huge regex. // Holding this lock prevents new series from being added (slows down the write path) // and blocks the compaction process. - vals := p.labelValues(name) - for i, count := 0, 1; i < len(vals); count++ { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { + // + // We just need to make sure we don't modify the slice we took, + // so we'll append matching values to a different one. + p.mtx.RLock() + readOnlyLabelValues := p.lvs[name] + p.mtx.RUnlock() + + vals := make([]string, 0, len(readOnlyLabelValues)) + for i, v := range readOnlyLabelValues { + if i%checkContextEveryNIterations == 0 && ctx.Err() != nil { return ErrPostings(ctx.Err()) } - if match(vals[i]) { - i++ - continue + if match(v) { + vals = append(vals, v) } - - // Didn't match, bring the last value to this position, make the slice shorter and check again. - // The order of the slice doesn't matter as it comes from a map iteration. - vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] } // If none matched (or this label had no values), no need to grab the lock again. @@ -463,27 +523,6 @@ func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string return Merge(ctx, its...) } -// labelValues returns a slice of label values for the given label name. -// It will take the read lock. -func (p *MemPostings) labelValues(name string) []string { - p.mtx.RLock() - defer p.mtx.RUnlock() - - e := p.m[name] - if len(e) == 0 { - return nil - } - - vals := make([]string, 0, len(e)) - for v, srs := range e { - if len(srs) > 0 { - vals = append(vals, v) - } - } - - return vals -} - // ExpandPostings returns the postings expanded as a slice. func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) { for p.Next() { diff --git a/vendor/modules.txt b/vendor/modules.txt index 0c2e377a11b..4d1095e17bd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1016,7 +1016,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20241120160701-db938c3ceac8 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20241129155652-bcbf77399208 ## explicit; go 1.22.0 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1680,7 +1680,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241120160701-db938c3ceac8 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241129155652-bcbf77399208 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240531075221-3685f1377d7b