diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index c4e3de043dc1..a65bdbcae105 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -66,6 +66,7 @@ func fillChunkClose(c Chunk, close bool) int64 { func fillChunkRandomOrder(c Chunk, close bool) { ub := int64(1 << 30) i := int64(0) + random := rand.New(rand.NewSource(42)) entry := &logproto.Entry{ Timestamp: time.Unix(0, 0), Line: testdata.LogString(i), @@ -77,7 +78,7 @@ func fillChunkRandomOrder(c Chunk, close bool) { panic(err) } i++ - entry.Timestamp = time.Unix(0, rand.Int63n(ub)) + entry.Timestamp = time.Unix(0, random.Int63n(ub)) entry.Line = testdata.LogString(i) } diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index 988e0051218b..25c160920ea3 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -4,6 +4,7 @@ import ( "container/heap" "context" "io" + "sort" "sync" "time" @@ -295,15 +296,11 @@ func (i *mergeEntryIterator) Len() int { } type entrySortIterator struct { - heap interface { - heap.Interface - Peek() EntryIterator - } - is []EntryIterator - prefetched bool - - currEntry entryWithLabels - errs []error + is []EntryIterator + prefetched bool + byAscendingTime bool + currEntry entryWithLabels + errs []error } // NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators. @@ -320,25 +317,48 @@ func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) Entr result := &entrySortIterator{is: is} switch direction { case logproto.BACKWARD: - result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: false, byAlphabetical: true} + result.byAscendingTime = false case logproto.FORWARD: - result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: true, byAlphabetical: true} + result.byAscendingTime = true default: panic("bad direction") } return result } -// init initialize the underlaying heap +func (i *entrySortIterator) lessByIndex(k, j int) bool { + t1, t2 := i.is[k].Entry().Timestamp.UnixNano(), i.is[j].Entry().Timestamp.UnixNano() + if t1 == t2 { + return i.is[k].Labels() < i.is[j].Labels() + } + if i.byAscendingTime { + return t1 < t2 + } + return t1 > t2 +} + +func (i *entrySortIterator) lessByValue(t1 int64, l1 string, index int) bool { + t2 := i.is[index].Entry().Timestamp.UnixNano() + if t1 == t2 { + return l1 < i.is[index].Labels() + } + if i.byAscendingTime { + return t1 < t2 + } + return t1 > t2 +} + +// init throws out empty iterators and sorts them. func (i *entrySortIterator) init() { if i.prefetched { return } i.prefetched = true + tmp := make([]EntryIterator, 0, len(i.is)) for _, it := range i.is { if it.Next() { - i.heap.Push(it) + tmp = append(tmp, it) continue } @@ -347,36 +367,57 @@ func (i *entrySortIterator) init() { } util.LogError("closing iterator", it.Close) } - heap.Init(i.heap) + i.is = tmp + sort.Slice(i.is, i.lessByIndex) +} - // We can now clear the list of input iterators to merge, given they have all - // been processed and the non empty ones have been pushed to the heap - i.is = nil +func (i *entrySortIterator) fix() { + head := i.is[0] + t1 := head.Entry().Timestamp.UnixNano() + l1 := head.Labels() + + // shortcut + if len(i.is) <= 1 || i.lessByValue(t1, l1, 1) { + return + } + + // First element is out of place. So we reposition it. + i.is = i.is[1:] // drop head + index := sort.Search(len(i.is), func(in int) bool { return i.lessByValue(t1, l1, in) }) + + if index == len(i.is) { + i.is = append(i.is, head) + } else { + i.is = append(i.is[:index+1], i.is[index:]...) + i.is[index] = head + } } func (i *entrySortIterator) Next() bool { i.init() - if i.heap.Len() == 0 { + if len(i.is) == 0 { return false } - next := i.heap.Peek() + next := i.is[0] i.currEntry.entry = next.Entry() i.currEntry.labels = next.Labels() i.currEntry.streamHash = next.StreamHash() // if the top iterator is empty, we remove it. if !next.Next() { - heap.Pop(i.heap) + i.is = i.is[1:] if err := next.Error(); err != nil { i.errs = append(i.errs, err) } util.LogError("closing iterator", next.Close) return true } - if i.heap.Len() > 1 { - heap.Fix(i.heap, 0) + + if len(i.is) > 1 { + i.fix() } + return true } @@ -404,8 +445,8 @@ func (i *entrySortIterator) Error() error { } func (i *entrySortIterator) Close() error { - for i.heap.Len() > 0 { - if err := i.heap.Pop().(EntryIterator).Close(); err != nil { + for _, entryIterator := range i.is { + if err := entryIterator.Close(); err != nil { return err } } diff --git a/pkg/iter/entry_iterator_test.go b/pkg/iter/entry_iterator_test.go index 3db309288763..7777ef674fe6 100644 --- a/pkg/iter/entry_iterator_test.go +++ b/pkg/iter/entry_iterator_test.go @@ -735,6 +735,7 @@ func BenchmarkSortIterator(b *testing.B) { }) b.Run("sort", func(b *testing.B) { + b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer()