Skip to content

Commit

Permalink
Fixes iterator At value re-used (#845)
Browse files Browse the repository at this point in the history
* Fixes iterator At value re-used

* make fmt
  • Loading branch information
cyriltovena authored Jul 12, 2023
1 parent 11fcb24 commit e154533
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 30 deletions.
31 changes: 31 additions & 0 deletions pkg/iter/tree.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package iter

import (
"github.com/grafana/phlare/pkg/util/loser"
)

var _ Iterator[interface{}] = &TreeIterator[interface{}]{}

type TreeIterator[T any] struct {
*loser.Tree[T, Iterator[T]]
}

// NewTreeIterator returns an Iterator that iterates over the given loser tree iterator.
func NewTreeIterator[T any](tree *loser.Tree[T, Iterator[T]]) *TreeIterator[T] {
return &TreeIterator[T]{
Tree: tree,
}
}

func (it TreeIterator[T]) At() T {
return it.Tree.Winner().At()
}

func (it *TreeIterator[T]) Err() error {
return it.Tree.Winner().Err()
}

func (it *TreeIterator[T]) Close() error {
it.Tree.Close()
return nil
}
74 changes: 47 additions & 27 deletions pkg/parquet/row_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package parquet
import (
"io"

"github.com/grafana/dskit/runutil"
"github.com/segmentio/parquet-go"

"github.com/grafana/phlare/pkg/iter"
"github.com/grafana/phlare/pkg/util"
"github.com/grafana/phlare/pkg/util/loser"
)

Expand All @@ -16,7 +18,7 @@ const (
var (
_ parquet.RowReader = (*emptyRowReader)(nil)
_ parquet.RowReader = (*ErrRowReader)(nil)
_ parquet.RowReader = (*MergeRowReader)(nil)
_ parquet.RowReader = (*IteratorRowReader)(nil)
_ iter.Iterator[parquet.Row] = (*BufferedRowReaderIterator)(nil)

EmptyRowReader = &emptyRowReader{}
Expand All @@ -32,10 +34,6 @@ func NewErrRowReader(err error) *ErrRowReader { return &ErrRowReader{err: err} }

func (e ErrRowReader) ReadRows(rows []parquet.Row) (int, error) { return 0, e.err }

type MergeRowReader struct {
tree *loser.Tree[parquet.Row, iter.Iterator[parquet.Row]]
}

// NewMergeRowReader returns a RowReader that k-way merges the given readers using the less function.
// Each reader must be sorted according to the less function already.
func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less func(parquet.Row, parquet.Row) bool) parquet.RowReader {
Expand All @@ -50,18 +48,31 @@ func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less f
its[i] = NewBufferedRowReaderIterator(readers[i], defaultRowBufferSize)
}

return &MergeRowReader{
tree: loser.New(
its,
maxValue,
func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() },
less,
func(it iter.Iterator[parquet.Row]) { it.Close() },
return NewIteratorRowReader(
iter.NewTreeIterator[parquet.Row](
loser.New(
its,
maxValue,
func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() },
less,
func(it iter.Iterator[parquet.Row]) { _ = it.Close() },
),
),
)
}

type IteratorRowReader struct {
iter.Iterator[parquet.Row]
}

// NewIteratorRowReader returns a RowReader that reads rows from the given iterator.
func NewIteratorRowReader(it iter.Iterator[parquet.Row]) *IteratorRowReader {
return &IteratorRowReader{
Iterator: it,
}
}

func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) {
func (it *IteratorRowReader) ReadRows(rows []parquet.Row) (int, error) {
var n int
if len(rows) == 0 {
return 0, nil
Expand All @@ -70,21 +81,27 @@ func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) {
if n == len(rows) {
break
}
if !s.tree.Next() {
s.tree.Close()
if err := s.tree.Err(); err != nil {
return n, err
if !it.Next() {
runutil.CloseWithLogOnErr(util.Logger, it.Iterator, "failed to close iterator")
if it.Err() != nil {
return n, it.Err()
}
return n, io.EOF
}
rows[n] = s.tree.Winner().At()
rows[n] = rows[n][:0]
for _, c := range it.At() {
rows[n] = append(rows[n], c.Clone())
}
n++
}
return n, nil
}

type BufferedRowReaderIterator struct {
reader parquet.RowReader
reader parquet.RowReader
bufferedRows []parquet.Row

// buff keep the original slice capacity to avoid allocations
buff []parquet.Row
bufferSize int
err error
Expand All @@ -100,16 +117,17 @@ func NewBufferedRowReaderIterator(reader parquet.RowReader, bufferSize int) *Buf
}

func (r *BufferedRowReaderIterator) Next() bool {
if len(r.buff) > 1 {
r.buff = r.buff[1:]
if len(r.bufferedRows) > 1 {
r.bufferedRows = r.bufferedRows[1:]
return true
}

// todo this seems to do allocations on every call since cap is always smaller
if cap(r.buff) < r.bufferSize {
r.buff = make([]parquet.Row, r.bufferSize)
}
r.buff = r.buff[:r.bufferSize]
n, err := r.reader.ReadRows(r.buff)
r.bufferedRows = r.buff[:r.bufferSize]
n, err := r.reader.ReadRows(r.bufferedRows)
if err != nil && err != io.EOF {
r.err = err
return false
Expand All @@ -118,15 +136,15 @@ func (r *BufferedRowReaderIterator) Next() bool {
return false
}

r.buff = r.buff[:n]
r.bufferedRows = r.bufferedRows[:n]
return true
}

func (r *BufferedRowReaderIterator) At() parquet.Row {
if len(r.buff) == 0 {
if len(r.bufferedRows) == 0 {
return parquet.Row{}
}
return r.buff[0]
return r.bufferedRows[0]
}

func (r *BufferedRowReaderIterator) Err() error {
Expand All @@ -150,7 +168,9 @@ func ReadAllWithBufferSize(r parquet.RowReader, bufferSize int) ([]parquet.Row,
return rows, err
}
if n != 0 {
rows = append(rows, batch[:n]...)
for i := range batch[:n] {
rows = append(rows, batch[i].Clone())
}
}
if n == 0 || err == io.EOF {
break
Expand Down
45 changes: 45 additions & 0 deletions pkg/parquet/row_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,48 @@ func TestNewMergeRowReader(t *testing.T) {
})
}
}

func TestIteratorRowReader(t *testing.T) {
it := NewIteratorRowReader(
NewBufferedRowReaderIterator(NewBatchReader([][]parquet.Row{
{{parquet.Int32Value(1)}, {parquet.Int32Value(2)}, {parquet.Int32Value(3)}},
{{parquet.Int32Value(4)}, {parquet.Int32Value(5)}, {parquet.Int32Value(6)}},
{{parquet.Int32Value(7)}, {parquet.Int32Value(8)}, {parquet.Int32Value(9)}},
}), 4),
)
actual, err := ReadAllWithBufferSize(it, 3)
require.NoError(t, err)
require.Equal(t, []parquet.Row{
{parquet.Int32Value(1)},
{parquet.Int32Value(2)},
{parquet.Int32Value(3)},
{parquet.Int32Value(4)},
{parquet.Int32Value(5)},
{parquet.Int32Value(6)},
{parquet.Int32Value(7)},
{parquet.Int32Value(8)},
{parquet.Int32Value(9)},
}, actual)
}

type SomeRow struct {
Col1 int
}

func BenchmarkBufferedRowReader(b *testing.B) {
buff := parquet.NewGenericBuffer[SomeRow]()
for i := 0; i < 1000000; i++ {
_, err := buff.Write([]SomeRow{{Col1: (i)}})
if err != nil {
b.Fatal(err)
}
}
reader := NewBufferedRowReaderIterator(buff.Rows(), 100)
defer reader.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for reader.Next() {
_ = reader.At()
}
}
}
4 changes: 2 additions & 2 deletions pkg/parquet/row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
"github.com/segmentio/parquet-go"
)

type RowGroupWriter interface {
type RowWriterFlusher interface {
parquet.RowWriter
Flush() error
}

// CopyAsRowGroups copies row groups to dst from src and flush a rowgroup per rowGroupNumCount read.
// It returns the total number of rows copied and the number of row groups written.
// Flush is called to create a new row group.
func CopyAsRowGroups(dst RowGroupWriter, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) {
func CopyAsRowGroups(dst RowWriterFlusher, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) {
if rowGroupNumCount <= 0 {
panic("rowGroupNumCount must be positive")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/parquet/row_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/stretchr/testify/require"
)

var _ RowGroupWriter = (*TestRowGroupWriter)(nil)
var _ RowWriterFlusher = (*TestRowGroupWriter)(nil)

type TestRowGroupWriter struct {
RowGroups [][]parquet.Row
Expand Down

0 comments on commit e154533

Please sign in to comment.