Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Improve Merge performance
Browse files Browse the repository at this point in the history
Use a heap for Next for merges, and
pre-compute if there's many postings on the
unset path.

Add posting lookup benchmarks

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
  • Loading branch information
brian-brazil committed Feb 21, 2019
1 parent c59ed49 commit 10f171a
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 24 deletions.
55 changes: 47 additions & 8 deletions head_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,56 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
testutil.Ok(b, err)
defer h.Close()

// TODO: vary number of series
for i := 0; i < 1000000; i++ {
h.getOrCreate(uint64(i), labels.FromStrings("a", strconv.Itoa(i)))
var hash uint64
for n := 0; n < 10; n++ {
for i := 0; i < 100000; i++ {
h.getOrCreate(hash, labels.FromStrings("i", strconv.Itoa(i), "n", strconv.Itoa(i), "j", "foo"))
hash++
h.getOrCreate(hash, labels.FromStrings("i", strconv.Itoa(i), "n", strconv.Itoa(i), "j", "bar"))
hash++
}
}

b.ResetTimer()
n1 := labels.NewEqualMatcher("n", "1")

all, _ := labels.NewRegexpMatcher("a", ".*")
jFoo := labels.NewEqualMatcher("j", "foo")
jNotFoo := labels.Not(jFoo)

for i := 0; i < b.N; i++ {
_, err := PostingsForMatchers(h.indexRange(0, 1000), all)
testutil.Ok(b, err)
iStar := labels.NewMustRegexpMatcher("i", "^.*$")
iPlus := labels.NewMustRegexpMatcher("i", "^.+$")
i1Plus := labels.NewMustRegexpMatcher("i", "^1.+$")
iEmptyRe := labels.NewMustRegexpMatcher("i", "^$")
iNotEmpty := labels.Not(labels.NewEqualMatcher("i", ""))
iNot2 := labels.Not(labels.NewEqualMatcher("n", "2"))
iNot2Star := labels.Not(labels.NewMustRegexpMatcher("i", "^2.*$"))

cases := []struct {
name string
matchers []labels.Matcher
}{
{`n="1"`, []labels.Matcher{n1}},
{`n="1",j="foo"`, []labels.Matcher{n1, jFoo}},
{`j="foo",n="1"`, []labels.Matcher{jFoo, n1}},
{`n="1",j!="foo"`, []labels.Matcher{n1, jNotFoo}},
{`i=~".*"`, []labels.Matcher{iStar}},
{`i=~".+"`, []labels.Matcher{iPlus}},
{`i=~""`, []labels.Matcher{iEmptyRe}},
{`i!=""`, []labels.Matcher{iNotEmpty}},
{`n="1",i=~".*",j="foo"`, []labels.Matcher{n1, iStar, jFoo}},
{`n="1",i=~".*",i!="2",j="foo"`, []labels.Matcher{n1, iStar, iNot2, jFoo}},
{`n="1",i!="",j="foo"`, []labels.Matcher{n1, iNotEmpty, jFoo}},
{`n="1",i=~".+",j="foo"`, []labels.Matcher{n1, iPlus, jFoo}},
{`n="1",i=~"1.+",j="foo"`, []labels.Matcher{n1, i1Plus, jFoo}},
{`n="1",i=~".+",i!="2",j="foo"`, []labels.Matcher{n1, iPlus, iNot2, jFoo}},
{`n="1",i=~".+",i!~"2.*",j="foo"`, []labels.Matcher{n1, iPlus, iNot2Star, jFoo}},
}

for _, c := range cases {
b.Run(c.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := PostingsForMatchers(h.indexRange(0, 1000), c.matchers...)
testutil.Ok(b, err)
}
})
}
}
140 changes: 125 additions & 15 deletions index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package index

import (
"container/heap"
"encoding/binary"
"runtime"
"sort"
Expand Down Expand Up @@ -365,25 +366,131 @@ func Merge(its ...Postings) Postings {
if len(its) == 1 {
return its[0]
}
// All the uses of this function immediately expand it, so
// collect everything in a map. This is more efficient
// when there's 100ks of postings, compared to
// having a tree of merge objects.
pm := make(map[uint64]struct{}, len(its))
for _, it := range its {
for it.Next() {
pm[it.At()] = struct{}{}
return newMergedPostings(its)
}

type PostingsHeap []Postings

func (h PostingsHeap) Len() int { return len(h) }
func (h PostingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() }
func (h *PostingsHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }

func (h *PostingsHeap) Push(x interface{}) {
*h = append(*h, x.(Postings))
}

func (h *PostingsHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

type mergedPostings struct {
h PostingsHeap
initilized bool
heaped bool
cur uint64
err error
}

func newMergedPostings(p []Postings) *mergedPostings {
ph := make(PostingsHeap, 0, len(p))
for _, it := range p {
if it.Next() {
ph = append(ph, it)
} else {
if it.Err() != nil {
return &mergedPostings{err: it.Err()}
}
}
}
return &mergedPostings{h: ph}
}

func (it *mergedPostings) Next() bool {
if it.h.Len() == 0 || it.err != nil {
return false
}

if !it.initilized {
it.cur = it.h[0].At()
it.initilized = true
return true
}
if !it.heaped {
heap.Init(&(it.h))
it.heaped = true
}

for {
cur := it.h[0]
if !cur.Next() {
heap.Pop(&(it.h))
if cur.Err() != nil {
it.err = cur.Err()
return false
}
if it.h.Len() == 0 {
return false
}
} else {
heap.Fix(&(it.h), 0)
}
if it.Err() != nil {
return ErrPostings(it.Err())

cur = it.h[0]
if it.h[0].At() != it.cur {
it.cur = it.h[0].At()
return true
}
}
}

func (it *mergedPostings) Seek(id uint64) bool {
if it.h.Len() == 0 || it.err != nil {
return false
}
if !it.initilized {
if !it.Next() {
return false
}
it.initilized = true
}
pl := make([]uint64, 0, len(pm))
for p := range pm {
pl = append(pl, p)
if it.cur >= id {
return true
}
sort.Slice(pl, func(i, j int) bool { return pl[i] < pl[j] })
return newListPostings(pl)
// Heapifying when there is lots of Seeks is inefficient.
it.heaped = false
newH := make(PostingsHeap, 0, len(it.h))
lowest := ^uint64(0)
for _, i := range it.h {
if i.Seek(id) {
newH = append(newH, i)
if i.At() < lowest {
lowest = i.At()
}
} else {
if i.Err() != nil {
it.err = i.Err()
return false
}
}
}
it.h = newH
if len(it.h) == 0 {
return false
}
it.cur = lowest
return true
}

func (it mergedPostings) At() uint64 {
return it.cur
}

func (it mergedPostings) Err() error {
return it.err
}

// Without returns a new postings list that contains all elements from the full list that
Expand Down Expand Up @@ -498,6 +605,9 @@ func (it *listPostings) Seek(x uint64) bool {
if it.cur >= x {
return true
}
if len(it.list) == 0 {
return false
}

// Do binary search between current position and end.
i := sort.Search(len(it.list), func(i int) bool {
Expand Down
13 changes: 12 additions & 1 deletion querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,22 @@ func postingsForUnsetLabelMatcher(ix IndexReader, m labels.Matcher) (index.Posti
rit = append(rit, it)
}

merged := index.Merge(rit...)
// With many many postings, it's best to pre-calculate
// the merged list via next rather than have a ton of seeks.
if len(rit) > 100 {
pl, err := index.ExpandPostings(merged)
if err != nil {
return nil, err
}
merged = index.NewListPostings(pl)
}

allPostings, err := ix.Postings(index.AllPostingsKey())
if err != nil {
return nil, err
}
return index.Without(allPostings, index.Merge(rit...)), nil
return index.Without(allPostings, merged), nil
}

func mergeStrings(a, b []string) []string {
Expand Down

0 comments on commit 10f171a

Please sign in to comment.