diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 21619daca208..51736ee82ed9 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -4,7 +4,6 @@ import ( "container/heap" "fmt" "io" - "sort" "time" "github.com/grafana/loki/pkg/helpers" @@ -80,10 +79,18 @@ type iteratorMinHeap struct { func (h iteratorMinHeap) Less(i, j int) bool { t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp - if !t1.Equal(t2) { - return t1.Before(t2) + + un1 := t1.UnixNano() + un2 := t2.UnixNano() + + switch { + case un1 < un2: + return true + case un1 > un2: + return false + default: // un1 == un2: + return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels() } - return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels() } type iteratorMaxHeap struct { @@ -92,10 +99,18 @@ type iteratorMaxHeap struct { func (h iteratorMaxHeap) Less(i, j int) bool { t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp - if !t1.Equal(t2) { - return t1.After(t2) + + un1 := t1.UnixNano() + un2 := t2.UnixNano() + + switch { + case un1 < un2: + return false + case un1 > un2: + return true + default: // un1 == un2 + return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels() } - return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels() } // HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len @@ -116,7 +131,7 @@ type heapIterator struct { is []EntryIterator prefetched bool - tuples tuples + tuples []tuple currEntry logproto.Entry currLabels string errs []error @@ -184,12 +199,6 @@ type tuple struct { EntryIterator } -type tuples []tuple - -func (t tuples) Len() int { return len(t) } -func (t tuples) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t tuples) Less(i, j int) bool { return t[i].Line < t[j].Line } - func (i *heapIterator) Next() bool { i.prefetch() @@ -209,7 +218,8 @@ func (i *heapIterator) Next() bool { } heap.Pop(i.heap) - i.tuples = append(i.tuples, tuple{ + // insert keeps i.tuples sorted + i.tuples = insert(i.tuples, tuple{ Entry: entry, EntryIterator: next, }) @@ -229,8 +239,30 @@ func (i *heapIterator) Next() bool { return true } -func mostCommon(tuples tuples) tuple { - sort.Sort(tuples) +// Insert new tuple to correct position into ordered set of tuples. +// Insert sort is fast for small number of elements, and here we only expect max [number of replicas] elements. +func insert(ts []tuple, n tuple) []tuple { + ix := 0 + for ix < len(ts) && ts[ix].Line <= n.Line { + ix++ + } + if ix < len(ts) { + ts = append(ts, tuple{}) // zero element + copy(ts[ix+1:], ts[ix:]) + ts[ix] = n + } else { + ts = append(ts, n) + } + return ts +} + +// Expects that tuples are sorted already. We achieve that by using insert. +func mostCommon(tuples []tuple) tuple { + // trivial case, no need to do extra work. + if len(tuples) == 1 { + return tuples[0] + } + result := tuples[0] count, max := 0, 0 for i := 0; i < len(tuples)-1; i++ { diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index fa1a6d83eb30..8344a90679e1 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -2,6 +2,7 @@ package iter import ( "fmt" + "sort" "testing" "time" @@ -241,18 +242,39 @@ func TestMostCommon(t *testing.T) { } require.Equal(t, "a", mostCommon(tuples).Entry.Line) - // Last is most common tuples = []tuple{ {Entry: logproto.Entry{Line: "a"}}, {Entry: logproto.Entry{Line: "b"}}, - {Entry: logproto.Entry{Line: "c"}}, {Entry: logproto.Entry{Line: "b"}}, {Entry: logproto.Entry{Line: "c"}}, {Entry: logproto.Entry{Line: "c"}}, + {Entry: logproto.Entry{Line: "c"}}, + {Entry: logproto.Entry{Line: "d"}}, } require.Equal(t, "c", mostCommon(tuples).Entry.Line) } +func TestInsert(t *testing.T) { + toInsert := []tuple{ + {Entry: logproto.Entry{Line: "a"}}, + {Entry: logproto.Entry{Line: "e"}}, + {Entry: logproto.Entry{Line: "c"}}, + {Entry: logproto.Entry{Line: "b"}}, + {Entry: logproto.Entry{Line: "d"}}, + {Entry: logproto.Entry{Line: "a"}}, + {Entry: logproto.Entry{Line: "c"}}, + } + + var ts []tuple + for _, e := range toInsert { + ts = insert(ts, e) + } + + require.True(t, sort.SliceIsSorted(ts, func(i, j int) bool { + return ts[i].Line < ts[j].Line + })) +} + func TestEntryIteratorForward(t *testing.T) { itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels) itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}") diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 4a60063876a4..f0e2e7e6c2fe 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -764,6 +764,11 @@ func getLocalQuerier(size int64) Querier { iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="foo"}`)), iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="bazz"}`)), iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="fuzz"}`)), + // some duplicates + iter.NewStreamIterator(newStream(size, identity, `{app="foo"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="bazz"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)), } return QuerierFunc(func(ctx context.Context, p SelectParams) (iter.EntryIterator, error) { return iter.NewHeapIterator(iters, p.Direction), nil