Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up EntrySortIterator by 20%. #5318

Merged
merged 14 commits into from
Feb 9, 2022
104 changes: 75 additions & 29 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/heap"
"context"
"io"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -323,12 +324,9 @@ func (i *mergeEntryIterator) Len() int {
}

type entrySortIterator struct {
heap interface {
heap.Interface
Peek() EntryIterator
}
is []EntryIterator
prefetched bool
dir logproto.Direction

currEntry entryWithLabels
errs []error
Expand All @@ -344,28 +342,61 @@ func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) Entr
if len(is) == 1 {
return is[0]
}
result := &entrySortIterator{is: is}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))}
case logproto.FORWARD:
result.heap = &iteratorMinHeap{iteratorHeap: make([]EntryIterator, 0, len(is))}
default:
panic("bad direction")
}

result := &entrySortIterator{is: is, dir: direction}
return result
}

// init initialize the underlaying heap
func (it *entrySortIterator) less(i, j int) bool {
t1, t2 := it.is[i].Entry().Timestamp, it.is[j].Entry().Timestamp
if it.dir == logproto.BACKWARD {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
t2, t1 = t1, t2

}

un1 := t1.UnixNano()
un2 := t2.UnixNano()

switch {
case un1 < un2:
return true
case un1 > un2:
return false
default: // un1 == un2:
return it.is[i].Labels() < it.is[j].Labels()
}
}

func (it *entrySortIterator) lessThan(t1 time.Time, l1 string, j int) bool {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
t2 := it.is[j].Entry().Timestamp
if it.dir == logproto.BACKWARD {
t2, t1 = t1, t2
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

un1 := t1.UnixNano()
un2 := t2.UnixNano()

switch {
case un1 < un2:
return true
case un1 > un2:
return false
default: // un1 == un2:
return l1 < it.is[j].Labels()
}
}

// init throws out empty iterators and sorts them.
func (i *entrySortIterator) init() {
if i.prefetched {
return
}

i.prefetched = true
tmp := make([]EntryIterator, 0, len(i.is))
for _, it := range i.is {
if it.Next() {
i.heap.Push(it)
tmp = append(tmp, it)
continue
}

Expand All @@ -374,35 +405,55 @@ func (i *entrySortIterator) init() {
}
util.LogError("closing iterator", it.Close)
}
heap.Init(i.heap)
i.is = tmp
sort.Slice(i.is, i.less)
}

// We can now clear the list of input iterators to merge, given they have all
// been processed and the non empty ones have been pushed to the heap
i.is = nil
func (i *entrySortIterator) fix() {
t1 := i.is[0].Entry().Timestamp
l1 := i.is[0].Labels()
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

// shortcut
if len(i.is) <= 1 || i.lessThan(t1, l1, 1) {
return
}

// First element is out of place. So we reposition it.
index := sort.Search(len(i.is), func(in int) bool { return i.lessThan(t1, l1, in) })

head := i.is[0]
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
if index == len(i.is) {
i.is = append(i.is[1:], head)
} else {
i.is = append(i.is[1:index+1], i.is[index:]...)
i.is[index] = head
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
}

func (i *entrySortIterator) Next() bool {
i.init()

if i.heap.Len() == 0 {
if len(i.is) == 0 {
return false
}

next := i.heap.Peek()
next := i.is[0]
i.currEntry.entry = next.Entry()
i.currEntry.labels = next.Labels()
// if the top iterator is empty, we remove it.
if !next.Next() {
heap.Pop(i.heap)
i.is = i.is[1:]
if err := next.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", next.Close)
return true
}
if i.heap.Len() > 1 {
heap.Fix(i.heap, 0)

if len(i.is) > 1 {
i.fix()
}

return true
}

Expand All @@ -426,11 +477,6 @@ func (i *entrySortIterator) Error() error {
}

func (i *entrySortIterator) Close() error {
for i.heap.Len() > 0 {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err
}
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/iter/entry_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ func BenchmarkSortIterator(b *testing.B) {
})

b.Run("sort", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
Expand Down
3 changes: 3 additions & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,9 @@ github.com/prometheus/prometheus/util/teststorage
github.com/prometheus/prometheus/util/testutil
github.com/prometheus/prometheus/util/treecache
github.com/prometheus/prometheus/web/api/v1
# github.com/psilva261/timsort/v2 v2.0.0
## explicit; go 1.13
github.com/psilva261/timsort/v2
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
# github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
## explicit
github.com/rcrowley/go-metrics
Expand Down