Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[weekly-r319] MemPostings: keep a map of label values slices (#15426) #779

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 104 additions & 65 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/binary"
"fmt"
"maps"
"math"
"runtime"
"slices"
Expand All @@ -32,6 +33,8 @@ import (
"github.com/prometheus/prometheus/storage"
)

const exponentialSliceGrowthFactor = 2

var allPostingsKey = labels.Label{}

// AllPostingsKey returns the label key that is used to store the postings list of all existing IDs.
Expand All @@ -55,15 +58,33 @@ var ensureOrderBatchPool = sync.Pool{
// EnsureOrder() must be called once before any reads are done. This allows for quick
// unordered batch fills on startup.
type MemPostings struct {
mtx sync.RWMutex
m map[string]map[string][]storage.SeriesRef
mtx sync.RWMutex

// m holds the postings lists for each label-value pair, indexed first by label name, and then by label value.
//
// mtx must be held when interacting with m (the appropriate one for reading or writing).
// It is safe to retain a reference to a postings list after releasing the lock.
//
// BUG: There's currently a data race in addFor, which might modify the tail of the postings list:
// https://github.com/prometheus/prometheus/issues/15317
m map[string]map[string][]storage.SeriesRef

// lvs holds the label values for each label name.
// lvs[name] is essentially an unsorted append-only list of all keys in m[name]
// mtx must be held when interacting with lvs.
// Since it's append-only, it is safe to the label values slice after releasing the lock.
lvs map[string][]string

ordered bool
}

const defaultLabelNamesMapSize = 512

// NewMemPostings returns a memPostings that's ready for reads and writes.
func NewMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]storage.SeriesRef, 512),
m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize),
lvs: make(map[string][]string, defaultLabelNamesMapSize),
ordered: true,
}
}
Expand All @@ -72,24 +93,28 @@ func NewMemPostings() *MemPostings {
// until EnsureOrder() was called once.
func NewUnorderedMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]storage.SeriesRef, 512),
m: make(map[string]map[string][]storage.SeriesRef, defaultLabelNamesMapSize),
lvs: make(map[string][]string, defaultLabelNamesMapSize),
ordered: false,
}
}

// Symbols returns an iterator over all unique name and value strings, in order.
func (p *MemPostings) Symbols() StringIter {
p.mtx.RLock()
// Make a quick clone of the map to avoid holding the lock while iterating.
// It's safe to use the values of the map after releasing the lock, as they're append-only slices.
lvs := maps.Clone(p.lvs)
p.mtx.RUnlock()

// Add all the strings to a map to de-duplicate.
symbols := make(map[string]struct{}, 512)
for n, e := range p.m {
symbols := make(map[string]struct{}, defaultLabelNamesMapSize)
for n, labelValues := range lvs {
symbols[n] = struct{}{}
for v := range e {
for _, v := range labelValues {
symbols[v] = struct{}{}
}
}
p.mtx.RUnlock()

res := make([]string, 0, len(symbols))
for k := range symbols {
Expand Down Expand Up @@ -145,13 +170,14 @@ func (p *MemPostings) LabelNames() []string {
// LabelValues returns label values for the given name.
func (p *MemPostings) LabelValues(_ context.Context, name string) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()
values := p.lvs[name]
p.mtx.RUnlock()

values := make([]string, 0, len(p.m[name]))
for v := range p.m[name] {
values = append(values, v)
}
return values
// The slice from p.lvs[name] is shared between all readers, and it is append-only.
// Since it's shared, we need to make a copy of it before returning it to make
// sure that no caller modifies the original one by sorting it or filtering it.
// Since it's append-only, we can do this while not holding the mutex anymore.
return slices.Clone(values)
}

// PostingsStats contains cardinality based statistics for postings.
Expand Down Expand Up @@ -294,6 +320,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
p.mtx.Lock()
defer p.mtx.Unlock()

affectedLabelNames := map[string]struct{}{}
process := func(l labels.Label) {
orig := p.m[l.Name][l.Value]
repl := make([]storage.SeriesRef, 0, len(orig))
Expand All @@ -306,10 +333,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
p.m[l.Name][l.Value] = repl
} else {
delete(p.m[l.Name], l.Value)
// Delete the key if we removed all values.
if len(p.m[l.Name]) == 0 {
delete(p.m, l.Name)
}
affectedLabelNames[l.Name] = struct{}{}
}
}

Expand All @@ -323,22 +347,52 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
// Note that a read query will most likely want to read multiple postings lists, say 5, 10 or 20 (depending on the number of matchers)
// And that read query will most likely evaluate only one of those matchers before we unpause here, so we want to pause often.
if i%512 == 0 {
p.mtx.Unlock()
// While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
// it wouldn't ensure use that readers actually were able to get the read lock,
// because if there are writes waiting on same mutex, readers won't be able to get it.
// So we just grab one RLock ourselves.
p.mtx.RLock()
// We shouldn't wait here, because we would be blocking a potential write for no reason.
// Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
// Now we can wait a little bit just to increase the chance of a reader getting the lock.
// If we were deleting 100M series here, pausing every 512 with 1ms sleeps would be an extra of 200s, which is negligible.
time.Sleep(time.Millisecond)
p.mtx.Lock()
p.unlockWaitAndLockAgain()
}
}
process(allPostingsKey)

// Now we need to update the label values slices.
i = 0
for name := range affectedLabelNames {
i++
// From time to time we want some readers to go through and read their postings.
if i%512 == 0 {
p.unlockWaitAndLockAgain()
}

if len(p.m[name]) == 0 {
// Delete the label name key if we deleted all values.
delete(p.m, name)
delete(p.lvs, name)
continue
}

// Create the new slice with enough room to grow without reallocating.
// We have deleted values here, so there's definitely some churn, so be prepared for it.
lvs := make([]string, 0, exponentialSliceGrowthFactor*len(p.m[name]))
for v := range p.m[name] {
lvs = append(lvs, v)
}
p.lvs[name] = lvs
}
}

// unlockWaitAndLockAgain will unlock an already locked p.mtx.Lock() and then wait a little bit before locking it again,
// letting the RLock()-waiting goroutines to get the lock.
func (p *MemPostings) unlockWaitAndLockAgain() {
p.mtx.Unlock()
// While it's tempting to just do a `time.Sleep(time.Millisecond)` here,
// it wouldn't ensure use that readers actually were able to get the read lock,
// because if there are writes waiting on same mutex, readers won't be able to get it.
// So we just grab one RLock ourselves.
p.mtx.RLock()
// We shouldn't wait here, because we would be blocking a potential write for no reason.
// Note that if there's a writer waiting for us to unlock, no reader will be able to get the read lock.
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
// Now we can wait a little bit just to increase the chance of a reader getting the lock.
time.Sleep(time.Millisecond)
p.mtx.Lock()
}

// Iter calls f for each postings list. It aborts if f returns an error and returns it.
Expand Down Expand Up @@ -370,7 +424,7 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) {

func appendWithExponentialGrowth[T any](a []T, v T) []T {
if cap(a) < len(a)+1 {
newList := make([]T, len(a), len(a)*2+1)
newList := make([]T, len(a), len(a)*exponentialSliceGrowthFactor+1)
copy(newList, a)
a = newList
}
Expand All @@ -383,7 +437,11 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
nm = map[string][]storage.SeriesRef{}
p.m[l.Name] = nm
}
list := appendWithExponentialGrowth(nm[l.Value], id)
vm, ok := nm[l.Value]
if !ok {
p.lvs[l.Name] = appendWithExponentialGrowth(p.lvs[l.Name], l.Value)
}
list := appendWithExponentialGrowth(vm, id)
nm[l.Value] = list

if !p.ordered {
Expand All @@ -402,25 +460,27 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
}

func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
// We'll copy the values into a slice and then match over that,
// We'll take the label values slice and then match over that,
// this way we don't need to hold the mutex while we're matching,
// which can be slow (seconds) if the match function is a huge regex.
// Holding this lock prevents new series from being added (slows down the write path)
// and blocks the compaction process.
vals := p.labelValues(name)
for i, count := 0, 1; i < len(vals); count++ {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
//
// We just need to make sure we don't modify the slice we took,
// so we'll append matching values to a different one.
p.mtx.RLock()
readOnlyLabelValues := p.lvs[name]
p.mtx.RUnlock()

vals := make([]string, 0, len(readOnlyLabelValues))
for i, v := range readOnlyLabelValues {
if i%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return ErrPostings(ctx.Err())
}

if match(vals[i]) {
i++
continue
if match(v) {
vals = append(vals, v)
}

// Didn't match, bring the last value to this position, make the slice shorter and check again.
// The order of the slice doesn't matter as it comes from a map iteration.
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
}

// If none matched (or this label had no values), no need to grab the lock again.
Expand Down Expand Up @@ -465,27 +525,6 @@ func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string
return Merge(ctx, its...)
}

// labelValues returns a slice of label values for the given label name.
// It will take the read lock.
func (p *MemPostings) labelValues(name string) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()

e := p.m[name]
if len(e) == 0 {
return nil
}

vals := make([]string, 0, len(e))
for v, srs := range e {
if len(srs) > 0 {
vals = append(vals, v)
}
}

return vals
}

// ExpandPostings returns the postings expanded as a slice.
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
for p.Next() {
Expand Down
Loading