Skip to content

Commit

Permalink
RangeQuery benchmark optimizations (#1413)
Browse files Browse the repository at this point in the history
* Added some duplicate streams.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Instead of using sort.Sort(), we keep tuples sorted already.

Also just return first element if there is only one tuple.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Don't do time comparison twice, compare nanoseconds instead.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany authored and cyriltovena committed Dec 13, 2019
1 parent f0f6f24 commit f6be636
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 19 deletions.
66 changes: 49 additions & 17 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"container/heap"
"fmt"
"io"
"sort"
"time"

"github.com/grafana/loki/pkg/helpers"
Expand Down Expand Up @@ -80,10 +79,18 @@ type iteratorMinHeap struct {

func (h iteratorMinHeap) Less(i, j int) bool {
t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp
if !t1.Equal(t2) {
return t1.Before(t2)

un1 := t1.UnixNano()
un2 := t2.UnixNano()

switch {
case un1 < un2:
return true
case un1 > un2:
return false
default: // un1 == un2:
return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels()
}
return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels()
}

type iteratorMaxHeap struct {
Expand All @@ -92,10 +99,18 @@ type iteratorMaxHeap struct {

func (h iteratorMaxHeap) Less(i, j int) bool {
t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp
if !t1.Equal(t2) {
return t1.After(t2)

un1 := t1.UnixNano()
un2 := t2.UnixNano()

switch {
case un1 < un2:
return false
case un1 > un2:
return true
default: // un1 == un2
return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels()
}
return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels()
}

// HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len
Expand All @@ -116,7 +131,7 @@ type heapIterator struct {
is []EntryIterator
prefetched bool

tuples tuples
tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
Expand Down Expand Up @@ -184,12 +199,6 @@ type tuple struct {
EntryIterator
}

type tuples []tuple

func (t tuples) Len() int { return len(t) }
func (t tuples) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t tuples) Less(i, j int) bool { return t[i].Line < t[j].Line }

func (i *heapIterator) Next() bool {
i.prefetch()

Expand All @@ -209,7 +218,8 @@ func (i *heapIterator) Next() bool {
}

heap.Pop(i.heap)
i.tuples = append(i.tuples, tuple{
// insert keeps i.tuples sorted
i.tuples = insert(i.tuples, tuple{
Entry: entry,
EntryIterator: next,
})
Expand All @@ -229,8 +239,30 @@ func (i *heapIterator) Next() bool {
return true
}

func mostCommon(tuples tuples) tuple {
sort.Sort(tuples)
// Insert new tuple to correct position into ordered set of tuples.
// Insert sort is fast for small number of elements, and here we only expect max [number of replicas] elements.
func insert(ts []tuple, n tuple) []tuple {
ix := 0
for ix < len(ts) && ts[ix].Line <= n.Line {
ix++
}
if ix < len(ts) {
ts = append(ts, tuple{}) // zero element
copy(ts[ix+1:], ts[ix:])
ts[ix] = n
} else {
ts = append(ts, n)
}
return ts
}

// Expects that tuples are sorted already. We achieve that by using insert.
func mostCommon(tuples []tuple) tuple {
// trivial case, no need to do extra work.
if len(tuples) == 1 {
return tuples[0]
}

result := tuples[0]
count, max := 0, 0
for i := 0; i < len(tuples)-1; i++ {
Expand Down
26 changes: 24 additions & 2 deletions pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package iter

import (
"fmt"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -241,18 +242,39 @@ func TestMostCommon(t *testing.T) {
}
require.Equal(t, "a", mostCommon(tuples).Entry.Line)

// Last is most common
tuples = []tuple{
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "d"}},
}
require.Equal(t, "c", mostCommon(tuples).Entry.Line)
}

func TestInsert(t *testing.T) {
toInsert := []tuple{
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "e"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "d"}},
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "c"}},
}

var ts []tuple
for _, e := range toInsert {
ts = insert(ts, e)
}

require.True(t, sort.SliceIsSorted(ts, func(i, j int) bool {
return ts[i].Line < ts[j].Line
}))
}

func TestEntryIteratorForward(t *testing.T) {
itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels)
itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}")
Expand Down
5 changes: 5 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,11 @@ func getLocalQuerier(size int64) Querier {
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="foo"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="bazz"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="fuzz"}`)),
// some duplicates
iter.NewStreamIterator(newStream(size, identity, `{app="foo"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="bazz"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)),
}
return QuerierFunc(func(ctx context.Context, p SelectParams) (iter.EntryIterator, error) {
return iter.NewHeapIterator(iters, p.Direction), nil
Expand Down

0 comments on commit f6be636

Please sign in to comment.