Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up EntrySortIterator by 20%. #5318

Merged
merged 14 commits into from
Feb 9, 2022
3 changes: 2 additions & 1 deletion pkg/chunkenc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func fillChunkClose(c Chunk, close bool) int64 {
func fillChunkRandomOrder(c Chunk, close bool) {
ub := int64(1 << 30)
i := int64(0)
random := rand.New(rand.NewSource(42))
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: testdata.LogString(i),
Expand All @@ -77,7 +78,7 @@ func fillChunkRandomOrder(c Chunk, close bool) {
panic(err)
}
i++
entry.Timestamp = time.Unix(0, rand.Int63n(ub))
entry.Timestamp = time.Unix(0, random.Int63n(ub))
entry.Line = testdata.LogString(i)

}
Expand Down
99 changes: 75 additions & 24 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/heap"
"context"
"io"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -295,15 +296,12 @@ func (i *mergeEntryIterator) Len() int {
}

type entrySortIterator struct {
heap interface {
heap.Interface
Peek() EntryIterator
}
is []EntryIterator
prefetched bool

currEntry entryWithLabels
errs []error
is []EntryIterator
prefetched bool
byAlphabetical bool
byAscendingTime bool
currEntry entryWithLabels
errs []error
}

// NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators.
Expand All @@ -320,25 +318,56 @@ func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) Entr
result := &entrySortIterator{is: is}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: false, byAlphabetical: true}
result.byAscendingTime = false
result.byAlphabetical = true
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
case logproto.FORWARD:
result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: true, byAlphabetical: true}
result.byAscendingTime = true
result.byAlphabetical = true
default:
panic("bad direction")
}
return result
}

// init initialize the underlaying heap
func (i *entrySortIterator) lessByIndex(k, j int) bool {
t1, t2 := i.is[k].Entry().Timestamp.UnixNano(), i.is[j].Entry().Timestamp.UnixNano()
if t1 == t2 {
if i.byAlphabetical {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
return i.is[k].Labels() < i.is[j].Labels()
}
return i.is[k].StreamHash() < i.is[j].StreamHash()
}
if i.byAscendingTime {
return t1 < t2
}
return t1 > t2
}

func (i *entrySortIterator) lessByValue(t1 int64, l1 string, h1 uint64, index int) bool {
t2 := i.is[index].Entry().Timestamp.UnixNano()
if t1 == t2 {
if i.byAlphabetical {
return l1 < i.is[index].Labels()
}
return h1 < i.is[index].StreamHash()
}
if i.byAscendingTime {
return t1 < t2
}
return t1 > t2
}

// init throws out empty iterators and sorts them.
func (i *entrySortIterator) init() {
if i.prefetched {
return
}

i.prefetched = true
tmp := make([]EntryIterator, 0, len(i.is))
for _, it := range i.is {
if it.Next() {
i.heap.Push(it)
tmp = append(tmp, it)
continue
}

Expand All @@ -347,36 +376,58 @@ func (i *entrySortIterator) init() {
}
util.LogError("closing iterator", it.Close)
}
heap.Init(i.heap)
i.is = tmp
sort.Slice(i.is, i.lessByIndex)
}

// We can now clear the list of input iterators to merge, given they have all
// been processed and the non empty ones have been pushed to the heap
i.is = nil
func (i *entrySortIterator) fix() {
head := i.is[0]
t1 := head.Entry().Timestamp.UnixNano()
l1 := head.Labels()
h1 := head.StreamHash()

// shortcut
if len(i.is) <= 1 || i.lessByValue(t1, l1, h1, 1) {
return
}

// First element is out of place. So we reposition it.
i.is = i.is[1:] // drop head
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
index := sort.Search(len(i.is), func(in int) bool { return i.lessByValue(t1, l1, h1, in) })

if index == len(i.is) {
i.is = append(i.is, head)
} else {
i.is = append(i.is[:index+1], i.is[index:]...)
i.is[index] = head
}
}

func (i *entrySortIterator) Next() bool {
i.init()

if i.heap.Len() == 0 {
if len(i.is) == 0 {
return false
}

next := i.heap.Peek()
next := i.is[0]
i.currEntry.entry = next.Entry()
i.currEntry.labels = next.Labels()
i.currEntry.streamHash = next.StreamHash()
// if the top iterator is empty, we remove it.
if !next.Next() {
heap.Pop(i.heap)
i.is = i.is[1:]
if err := next.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", next.Close)
return true
}
if i.heap.Len() > 1 {
heap.Fix(i.heap, 0)

if len(i.is) > 1 {
i.fix()
}

return true
}

Expand Down Expand Up @@ -404,8 +455,8 @@ func (i *entrySortIterator) Error() error {
}

func (i *entrySortIterator) Close() error {
for i.heap.Len() > 0 {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
if len(i.is) > 0 {
if err := i.is[0].Close(); err != nil {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/iter/entry_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ func BenchmarkSortIterator(b *testing.B) {
})

b.Run("sort", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
Expand Down