Skip to content

Commit

Permalink
Merge pull request #779 from grafana/weekly-r319-prometheus-15426
Browse files Browse the repository at this point in the history
[weekly-r319] MemPostings: keep a map of label values slices (#15426)
  • Loading branch information
colega authored Dec 2, 2024
2 parents 5bd08e6 + 358cb6e commit 4a1b9cb
Showing 1 changed file with 104 additions and 65 deletions.
169 changes: 104 additions & 65 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/binary"
"fmt"
"maps"
"math"
"runtime"
"slices"
Expand All @@ -32,6 +33,8 @@ import (
"github.com/prometheus/prometheus/storage"
)

const exponentialSliceGrowthFactor = 2

var allPostingsKey = labels.Label{}

// AllPostingsKey returns the label key that is used to store the postings list of all existing IDs.
Expand All @@ -55,15 +58,33 @@ var ensureOrderBatchPool = sync.Pool{
// EnsureOrder() must be called once before any reads are done. This allows for quick
// unordered batch fills on startup.
type MemPostings struct {
mtx sync.RWMutex
m map[string]map[string][]storage.SeriesRef
mtx sync.RWMutex

// m holds the postings lists for each label-value pair, indexed first by label name, and then by label value.
//
// mtx must be held when interacting with m (the appropriate one for reading or writing).
// It is safe to retain a reference to a postings list after releasing the lock.
//
// BUG: There's currently a data race in addFor, which might modify the tail of the postings list:
// https://github.com/prometheus/prometheus/issues/15317
m map[string]map[string][]storage.SeriesRef

// lvs holds the label values for each label name.
// lvs[name] is essentially an unsorted append-only list of all keys in m[name]
// mtx must be held when interacting with lvs.
// Since it's append-only, it is safe to the label values slice after releasing the lock.
lvs map[string][]string

ordered bool
}

const defaultLabelNamesMapSize = 512

// NewMemPostings returns a memPostings that's ready for reads and writes.
func NewMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]storage.SeriesRef, 512),
m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize),
lvs: make(map[string][]string, defaultLabelNamesMapSize),
ordered: true,
}
}
Expand All @@ -72,24 +93,28 @@ func NewMemPostings() *MemPostings {
// until EnsureOrder() was called once.
func NewUnorderedMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]storage.SeriesRef, 512),
m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize),
lvs: make(map[string][]string, defaultLabelNamesMapSize),
ordered: false,
}
}

// Symbols returns an iterator over all unique name and value strings, in order.
func (p *MemPostings) Symbols() StringIter {
p.mtx.RLock()
// Make a quick clone of the map to avoid holding the lock while iterating.
// It's safe to use the values of the map after releasing the lock, as they're append-only slices.
lvs := maps.Clone(p.lvs)
p.mtx.RUnlock()

// Add all the strings to a map to de-duplicate.
symbols := make(map[string]struct{}, 512)
for n, e := range p.m {
symbols := make(map[string]struct{}, defaultLabelNamesMapSize)
for n, labelValues := range lvs {
symbols[n] = struct{}{}
for v := range e {
for _, v := range labelValues {
symbols[v] = struct{}{}
}
}
p.mtx.RUnlock()

res := make([]string, 0, len(symbols))
for k := range symbols {
Expand Down Expand Up @@ -145,13 +170,14 @@ func (p *MemPostings) LabelNames() []string {
// LabelValues returns label values for the given name.
func (p *MemPostings) LabelValues(_ context.Context, name string) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()
values := p.lvs[name]
p.mtx.RUnlock()

values := make([]string, 0, len(p.m[name]))
for v := range p.m[name] {
values = append(values, v)
}
return values
// The slice from p.lvs[name] is shared between all readers, and it is append-only.
// Since it's shared, we need to make a copy of it before returning it to make
// sure that no caller modifies the original one by sorting it or filtering it.
// Since it's append-only, we can do this while not holding the mutex anymore.
return slices.Clone(values)
}

// PostingsStats contains cardinality based statistics for postings.
Expand Down Expand Up @@ -294,6 +320,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
p.mtx.Lock()
defer p.mtx.Unlock()

affectedLabelNames := map[string]struct{}{}
process := func(l labels.Label) {
orig := p.m[l.Name][l.Value]
repl := make([]storage.SeriesRef, 0, len(orig))
Expand All @@ -306,10 +333,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
p.m[l.Name][l.Value] = repl
} else {
delete(p.m[l.Name], l.Value)
// Delete the key if we removed all values.
if len(p.m[l.Name]) == 0 {
delete(p.m, l.Name)
}
affectedLabelNames[l.Name] = struct{}{}
}
}

Expand All @@ -323,22 +347,52 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
// Note that a read query will most likely want to read multiple postings lists, say 5, 10 or 20 (depending on the number of matchers)
// And that read query will most likely evaluate only one of those matchers before we unpause here, so we want to pause often.
if i%512 == 0 {
p.mtx.Unlock()
// While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
// it wouldn't ensure use that readers actually were able to get the read lock,
// because if there are writes waiting on same mutex, readers won't be able to get it.
// So we just grab one RLock ourselves.
p.mtx.RLock()
// We shouldn't wait here, because we would be blocking a potential write for no reason.
// Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
// Now we can wait a little bit just to increase the chance of a reader getting the lock.
// If we were deleting 100M series here, pausing every 512 with 1ms sleeps would be an extra of 200s, which is negligible.
time.Sleep(time.Millisecond)
p.mtx.Lock()
p.unlockWaitAndLockAgain()
}
}
process(allPostingsKey)

// Now we need to update the label values slices.
i = 0
for name := range affectedLabelNames {
i++
// From time to time we want some readers to go through and read their postings.
if i%512 == 0 {
p.unlockWaitAndLockAgain()
}

if len(p.m[name]) == 0 {
// Delete the label name key if we deleted all values.
delete(p.m, name)
delete(p.lvs, name)
continue
}

// Create the new slice with enough room to grow without reallocating.
// We have deleted values here, so there's definitely some churn, so be prepared for it.
lvs := make([]string, 0, exponentialSliceGrowthFactor*len(p.m[name]))
for v := range p.m[name] {
lvs = append(lvs, v)
}
p.lvs[name] = lvs
}
}

// unlockWaitAndLockAgain will unlock an already locked p.mtx.Lock() and then wait a little bit before locking it again,
// letting the RLock()-waiting goroutines to get the lock.
func (p *MemPostings) unlockWaitAndLockAgain() {
p.mtx.Unlock()
// While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
// it wouldn't ensure use that readers actually were able to get the read lock,
// because if there are writes waiting on same mutex, readers won't be able to get it.
// So we just grab one RLock ourselves.
p.mtx.RLock()
// We shouldn't wait here, because we would be blocking a potential write for no reason.
// Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
// Now we can wait a little bit just to increase the chance of a reader getting the lock.
time.Sleep(time.Millisecond)
p.mtx.Lock()
}

// Iter calls f for each postings list. It aborts if f returns an error and returns it.
Expand Down Expand Up @@ -370,7 +424,7 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) {

func appendWithExponentialGrowth[T any](a []T, v T) []T {
if cap(a) < len(a)+1 {
newList := make([]T, len(a), len(a)*2+1)
newList := make([]T, len(a), len(a)*exponentialSliceGrowthFactor+1)
copy(newList, a)
a = newList
}
Expand All @@ -383,7 +437,11 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
nm = map[string][]storage.SeriesRef{}
p.m[l.Name] = nm
}
list := appendWithExponentialGrowth(nm[l.Value], id)
vm, ok := nm[l.Value]
if !ok {
p.lvs[l.Name] = appendWithExponentialGrowth(p.lvs[l.Name], l.Value)
}
list := appendWithExponentialGrowth(vm, id)
nm[l.Value] = list

if !p.ordered {
Expand All @@ -402,25 +460,27 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
}

func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
// We'll copy the values into a slice and then match over that,
// We'll take the label values slice and then match over that,
// this way we don't need to hold the mutex while we're matching,
// which can be slow (seconds) if the match function is a huge regex.
// Holding this lock prevents new series from being added (slows down the write path)
// and blocks the compaction process.
vals := p.labelValues(name)
for i, count := 0, 1; i < len(vals); count++ {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
//
// We just need to make sure we don't modify the slice we took,
// so we'll append matching values to a different one.
p.mtx.RLock()
readOnlyLabelValues := p.lvs[name]
p.mtx.RUnlock()

vals := make([]string, 0, len(readOnlyLabelValues))
for i, v := range readOnlyLabelValues {
if i%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return ErrPostings(ctx.Err())
}

if match(vals[i]) {
i++
continue
if match(v) {
vals = append(vals, v)
}

// Didn't match, bring the last value to this position, make the slice shorter and check again.
// The order of the slice doesn't matter as it comes from a map iteration.
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
}

// If none matched (or this label had no values), no need to grab the lock again.
Expand Down Expand Up @@ -465,27 +525,6 @@ func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string
return Merge(ctx, its...)
}

// labelValues returns a slice of label values for the given label name.
// It will take the read lock.
func (p *MemPostings) labelValues(name string) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()

e := p.m[name]
if len(e) == 0 {
return nil
}

vals := make([]string, 0, len(e))
for v, srs := range e {
if len(srs) > 0 {
vals = append(vals, v)
}
}

return vals
}

// ExpandPostings returns the postings expanded as a slice.
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
for p.Next() {
Expand Down

0 comments on commit 4a1b9cb

Please sign in to comment.