diff --git a/pkg/iter/tree.go b/pkg/iter/tree.go new file mode 100644 index 0000000000..4f66950873 --- /dev/null +++ b/pkg/iter/tree.go @@ -0,0 +1,31 @@ +package iter + +import ( + "github.com/grafana/phlare/pkg/util/loser" +) + +var _ Iterator[interface{}] = &TreeIterator[interface{}]{} + +type TreeIterator[T any] struct { + *loser.Tree[T, Iterator[T]] +} + +// NewTreeIterator returns an Iterator that iterates over the given loser tree iterator. +func NewTreeIterator[T any](tree *loser.Tree[T, Iterator[T]]) *TreeIterator[T] { + return &TreeIterator[T]{ + Tree: tree, + } +} + +func (it TreeIterator[T]) At() T { + return it.Tree.Winner().At() +} + +func (it *TreeIterator[T]) Err() error { + return it.Tree.Winner().Err() +} + +func (it *TreeIterator[T]) Close() error { + it.Tree.Close() + return nil +} diff --git a/pkg/parquet/row_reader.go b/pkg/parquet/row_reader.go index 893e3036cb..e914f9ace2 100644 --- a/pkg/parquet/row_reader.go +++ b/pkg/parquet/row_reader.go @@ -3,9 +3,11 @@ package parquet import ( "io" + "github.com/grafana/dskit/runutil" "github.com/segmentio/parquet-go" "github.com/grafana/phlare/pkg/iter" + "github.com/grafana/phlare/pkg/util" "github.com/grafana/phlare/pkg/util/loser" ) @@ -16,7 +18,7 @@ const ( var ( _ parquet.RowReader = (*emptyRowReader)(nil) _ parquet.RowReader = (*ErrRowReader)(nil) - _ parquet.RowReader = (*MergeRowReader)(nil) + _ parquet.RowReader = (*IteratorRowReader)(nil) _ iter.Iterator[parquet.Row] = (*BufferedRowReaderIterator)(nil) EmptyRowReader = &emptyRowReader{} @@ -32,10 +34,6 @@ func NewErrRowReader(err error) *ErrRowReader { return &ErrRowReader{err: err} } func (e ErrRowReader) ReadRows(rows []parquet.Row) (int, error) { return 0, e.err } -type MergeRowReader struct { - tree *loser.Tree[parquet.Row, iter.Iterator[parquet.Row]] -} - // NewMergeRowReader returns a RowReader that k-way merges the given readers using the less function. // Each reader must be sorted according to the less function already. func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less func(parquet.Row, parquet.Row) bool) parquet.RowReader { @@ -50,18 +48,31 @@ func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less f its[i] = NewBufferedRowReaderIterator(readers[i], defaultRowBufferSize) } - return &MergeRowReader{ - tree: loser.New( - its, - maxValue, - func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() }, - less, - func(it iter.Iterator[parquet.Row]) { it.Close() }, + return NewIteratorRowReader( + iter.NewTreeIterator[parquet.Row]( + loser.New( + its, + maxValue, + func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() }, + less, + func(it iter.Iterator[parquet.Row]) { _ = it.Close() }, + ), ), + ) +} + +type IteratorRowReader struct { + iter.Iterator[parquet.Row] +} + +// NewIteratorRowReader returns a RowReader that reads rows from the given iterator. +func NewIteratorRowReader(it iter.Iterator[parquet.Row]) *IteratorRowReader { + return &IteratorRowReader{ + Iterator: it, } } -func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) { +func (it *IteratorRowReader) ReadRows(rows []parquet.Row) (int, error) { var n int if len(rows) == 0 { return 0, nil @@ -70,21 +81,27 @@ func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) { if n == len(rows) { break } - if !s.tree.Next() { - s.tree.Close() - if err := s.tree.Err(); err != nil { - return n, err + if !it.Next() { + runutil.CloseWithLogOnErr(util.Logger, it.Iterator, "failed to close iterator") + if it.Err() != nil { + return n, it.Err() } return n, io.EOF } - rows[n] = s.tree.Winner().At() + rows[n] = rows[n][:0] + for _, c := range it.At() { + rows[n] = append(rows[n], c.Clone()) + } n++ } return n, nil } type BufferedRowReaderIterator struct { - reader parquet.RowReader + reader parquet.RowReader + bufferedRows []parquet.Row + + // buff keep the original slice capacity to avoid allocations buff []parquet.Row bufferSize int err error @@ -100,16 +117,17 @@ func NewBufferedRowReaderIterator(reader parquet.RowReader, bufferSize int) *Buf } func (r *BufferedRowReaderIterator) Next() bool { - if len(r.buff) > 1 { - r.buff = r.buff[1:] + if len(r.bufferedRows) > 1 { + r.bufferedRows = r.bufferedRows[1:] return true } + // todo this seems to do allocations on every call since cap is always smaller if cap(r.buff) < r.bufferSize { r.buff = make([]parquet.Row, r.bufferSize) } - r.buff = r.buff[:r.bufferSize] - n, err := r.reader.ReadRows(r.buff) + r.bufferedRows = r.buff[:r.bufferSize] + n, err := r.reader.ReadRows(r.bufferedRows) if err != nil && err != io.EOF { r.err = err return false @@ -118,15 +136,15 @@ func (r *BufferedRowReaderIterator) Next() bool { return false } - r.buff = r.buff[:n] + r.bufferedRows = r.bufferedRows[:n] return true } func (r *BufferedRowReaderIterator) At() parquet.Row { - if len(r.buff) == 0 { + if len(r.bufferedRows) == 0 { return parquet.Row{} } - return r.buff[0] + return r.bufferedRows[0] } func (r *BufferedRowReaderIterator) Err() error { @@ -150,7 +168,9 @@ func ReadAllWithBufferSize(r parquet.RowReader, bufferSize int) ([]parquet.Row, return rows, err } if n != 0 { - rows = append(rows, batch[:n]...) + for i := range batch[:n] { + rows = append(rows, batch[i].Clone()) + } } if n == 0 || err == io.EOF { break diff --git a/pkg/parquet/row_reader_test.go b/pkg/parquet/row_reader_test.go index 075d44bc71..d449c46093 100644 --- a/pkg/parquet/row_reader_test.go +++ b/pkg/parquet/row_reader_test.go @@ -141,3 +141,48 @@ func TestNewMergeRowReader(t *testing.T) { }) } } + +func TestIteratorRowReader(t *testing.T) { + it := NewIteratorRowReader( + NewBufferedRowReaderIterator(NewBatchReader([][]parquet.Row{ + {{parquet.Int32Value(1)}, {parquet.Int32Value(2)}, {parquet.Int32Value(3)}}, + {{parquet.Int32Value(4)}, {parquet.Int32Value(5)}, {parquet.Int32Value(6)}}, + {{parquet.Int32Value(7)}, {parquet.Int32Value(8)}, {parquet.Int32Value(9)}}, + }), 4), + ) + actual, err := ReadAllWithBufferSize(it, 3) + require.NoError(t, err) + require.Equal(t, []parquet.Row{ + {parquet.Int32Value(1)}, + {parquet.Int32Value(2)}, + {parquet.Int32Value(3)}, + {parquet.Int32Value(4)}, + {parquet.Int32Value(5)}, + {parquet.Int32Value(6)}, + {parquet.Int32Value(7)}, + {parquet.Int32Value(8)}, + {parquet.Int32Value(9)}, + }, actual) +} + +type SomeRow struct { + Col1 int +} + +func BenchmarkBufferedRowReader(b *testing.B) { + buff := parquet.NewGenericBuffer[SomeRow]() + for i := 0; i < 1000000; i++ { + _, err := buff.Write([]SomeRow{{Col1: (i)}}) + if err != nil { + b.Fatal(err) + } + } + reader := NewBufferedRowReaderIterator(buff.Rows(), 100) + defer reader.Close() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for reader.Next() { + _ = reader.At() + } + } +} diff --git a/pkg/parquet/row_writer.go b/pkg/parquet/row_writer.go index eebc76590d..54bf3abbdb 100644 --- a/pkg/parquet/row_writer.go +++ b/pkg/parquet/row_writer.go @@ -6,7 +6,7 @@ import ( "github.com/segmentio/parquet-go" ) -type RowGroupWriter interface { +type RowWriterFlusher interface { parquet.RowWriter Flush() error } @@ -14,7 +14,7 @@ type RowGroupWriter interface { // CopyAsRowGroups copies row groups to dst from src and flush a rowgroup per rowGroupNumCount read. // It returns the total number of rows copied and the number of row groups written. // Flush is called to create a new row group. -func CopyAsRowGroups(dst RowGroupWriter, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) { +func CopyAsRowGroups(dst RowWriterFlusher, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) { if rowGroupNumCount <= 0 { panic("rowGroupNumCount must be positive") } diff --git a/pkg/parquet/row_writer_test.go b/pkg/parquet/row_writer_test.go index 8b6ddb52ee..285cc6ab9b 100644 --- a/pkg/parquet/row_writer_test.go +++ b/pkg/parquet/row_writer_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" ) -var _ RowGroupWriter = (*TestRowGroupWriter)(nil) +var _ RowWriterFlusher = (*TestRowGroupWriter)(nil) type TestRowGroupWriter struct { RowGroups [][]parquet.Row