From fb1455739d9a7b94d354155e7dda6ddf56f51f85 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 16 Aug 2022 14:24:59 -0400 Subject: [PATCH] ARROW-16790: [Go][Parquet] Avoid unnecessary memory allocations for skipping rows (#13887) Authored-by: Matt Topol Signed-off-by: Matt Topol --- go/parquet/file/column_reader.go | 18 +++++++++++++----- go/parquet/file/column_reader_test.go | 19 ++++++++++++++++++- go/parquet/file/column_writer_test.go | 20 +++++++++++++++++++- go/parquet/file/file_reader.go | 23 +++++++++++++++++++++++ go/parquet/file/record_reader.go | 25 +++++++++++++------------ go/parquet/file/row_group_reader.go | 5 ++++- go/parquet/pqarrow/column_readers.go | 5 +++-- go/parquet/pqarrow/file_reader.go | 6 +++--- 8 files changed, 96 insertions(+), 25 deletions(-) diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go index 32fa21adeae3f..cfc0b224324af 100644 --- a/go/parquet/file/column_reader.go +++ b/go/parquet/file/column_reader.go @@ -18,6 +18,7 @@ package file import ( "fmt" + "sync" "github.com/apache/arrow/go/v10/arrow/memory" "github.com/apache/arrow/go/v10/internal/utils" @@ -125,6 +126,7 @@ type columnChunkReader struct { // the number of values we've decoded so far numDecoded int64 mem memory.Allocator + bufferPool *sync.Pool decoders map[format.Encoding]encoding.TypedDecoder decoderTraits encoding.DecoderTraits @@ -136,8 +138,12 @@ type columnChunkReader struct { // NewColumnReader returns a column reader for the provided column initialized with the given pagereader that will // provide the pages of data for this column. The type is determined from the column passed in. -func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator) ColumnChunkReader { - base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder)} +// +// In addition to the page reader and allocator, a pointer to a shared sync.Pool is expected to provide buffers for temporary +// usage to minimize allocations. The bufferPool should provide *memory.Buffer objects that can be resized as necessary, buffers +// should have `ResizeNoShrink(0)` called on them before being put back into the pool. +func NewColumnReader(descr *schema.Column, pageReader PageReader, mem memory.Allocator, bufferPool *sync.Pool) ColumnChunkReader { + base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, decoders: make(map[format.Encoding]encoding.TypedDecoder), bufferPool: bufferPool} switch descr.PhysicalType() { case parquet.Types.FixedLenByteArray: base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits @@ -435,15 +441,17 @@ func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64, b valsRead int64 = 0 ) - // TODO(ARROW-16790): ideally we should re-use a shared pool of buffers to avoid unnecessary memory allocation for skips - scratch := memory.NewResizableBuffer(c.mem) + scratch := c.bufferPool.Get().(*memory.Buffer) + defer func() { + scratch.ResizeNoShrink(0) + c.bufferPool.Put(scratch) + }() bufMult := 1 if c.descr.PhysicalType() == parquet.Types.Boolean { // for bools, BytesRequired returns 1 byte per 8 bool, but casting []byte to []bool requires 1 byte per 1 bool bufMult = 8 } scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize) * bufMult)) - defer scratch.Release() for { batchSize = utils.Min(batchSize, toskip) diff --git a/go/parquet/file/column_reader_test.go b/go/parquet/file/column_reader_test.go index eb3409942f61d..c0b727ed0cc23 100755 --- a/go/parquet/file/column_reader_test.go +++ b/go/parquet/file/column_reader_test.go @@ -20,6 +20,8 @@ import ( "math" "math/rand" "reflect" + "runtime" + "sync" "testing" "github.com/apache/arrow/go/v10/arrow/memory" @@ -173,10 +175,25 @@ type PrimitiveReaderSuite struct { nvalues int maxDefLvl int16 maxRepLvl int16 + + bufferPool sync.Pool +} + +func (p *PrimitiveReaderSuite) SetupTest() { + p.bufferPool = sync.Pool{ + New: func() interface{} { + buf := memory.NewResizableBuffer(mem) + runtime.SetFinalizer(buf, func(obj *memory.Buffer) { + obj.Release() + }) + return buf + }, + } } func (p *PrimitiveReaderSuite) TearDownTest() { p.clear() + p.bufferPool = sync.Pool{} } func (p *PrimitiveReaderSuite) initReader(d *schema.Column) { @@ -185,7 +202,7 @@ func (p *PrimitiveReaderSuite) initReader(d *schema.Column) { m.TestData().Set("pages", p.pages) m.On("Err").Return((error)(nil)) p.pager = m - p.reader = file.NewColumnReader(d, m, mem) + p.reader = file.NewColumnReader(d, m, mem, &p.bufferPool) } func (p *PrimitiveReaderSuite) checkResults(typ reflect.Type) { diff --git a/go/parquet/file/column_writer_test.go b/go/parquet/file/column_writer_test.go index dc141ba1d3dc9..39eeb06f23c0c 100755 --- a/go/parquet/file/column_writer_test.go +++ b/go/parquet/file/column_writer_test.go @@ -20,6 +20,8 @@ import ( "bytes" "math" "reflect" + "runtime" + "sync" "testing" "github.com/apache/arrow/go/v10/arrow/bitutil" @@ -223,6 +225,8 @@ type PrimitiveWriterTestSuite struct { metadata *metadata.ColumnChunkMetaDataBuilder sink *encoding.BufferWriter readbuffer *memory.Buffer + + bufferPool sync.Pool } func (p *PrimitiveWriterTestSuite) SetupTest() { @@ -230,12 +234,26 @@ func (p *PrimitiveWriterTestSuite) SetupTest() { p.props = parquet.NewWriterProperties() p.SetupSchema(parquet.Repetitions.Required, 1) p.descr = p.Schema.Column(0) + + p.bufferPool = sync.Pool{ + New: func() interface{} { + buf := memory.NewResizableBuffer(mem) + runtime.SetFinalizer(buf, func(obj *memory.Buffer) { + obj.Release() + }) + return buf + }, + } +} + +func (p *PrimitiveWriterTestSuite) TearDownTest() { + p.bufferPool = sync.Pool{} } func (p *PrimitiveWriterTestSuite) buildReader(nrows int64, compression compress.Compression) file.ColumnChunkReader { p.readbuffer = p.sink.Finish() pagereader, _ := file.NewPageReader(arrutils.NewBufferedReader(bytes.NewReader(p.readbuffer.Bytes()), p.readbuffer.Len()), nrows, compression, mem, nil) - return file.NewColumnReader(p.descr, pagereader, mem) + return file.NewColumnReader(p.descr, pagereader, mem, &p.bufferPool) } func (p *PrimitiveWriterTestSuite) buildWriter(_ int64, columnProps parquet.ColumnProperties, version parquet.Version) file.ColumnChunkWriter { diff --git a/go/parquet/file/file_reader.go b/go/parquet/file/file_reader.go index a7e6525ace328..d9a73faa63288 100644 --- a/go/parquet/file/file_reader.go +++ b/go/parquet/file/file_reader.go @@ -22,6 +22,8 @@ import ( "fmt" "io" "os" + "runtime" + "sync" "github.com/apache/arrow/go/v10/arrow/memory" "github.com/apache/arrow/go/v10/parquet" @@ -47,6 +49,8 @@ type Reader struct { metadata *metadata.FileMetaData footerOffset int64 fileDecryptor encryption.FileDecryptor + + bufferPool sync.Pool } type ReadOption func(*Reader) @@ -113,6 +117,16 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, er f.props = parquet.NewReaderProperties(memory.NewGoAllocator()) } + f.bufferPool = sync.Pool{ + New: func() interface{} { + buf := memory.NewResizableBuffer(f.props.Allocator()) + runtime.SetFinalizer(buf, func(obj *memory.Buffer) { + obj.Release() + }) + return buf + }, + } + if f.metadata == nil { return f, f.parseMetaData() } @@ -120,6 +134,14 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts ...ReadOption) (*Reader, er return f, nil } +// BufferPool returns the internal buffer pool being utilized by this reader. +// This is primarily for use by the pqarrow.FileReader or anything that builds +// on top of the Reader and constructs their own ColumnReaders (like the +// RecordReader) +func (f *Reader) BufferPool() *sync.Pool { + return &f.bufferPool +} + // Close will close the current reader, and if the underlying reader being used // is an `io.Closer` then Close will be called on it too. func (f *Reader) Close() error { @@ -290,5 +312,6 @@ func (f *Reader) RowGroup(i int) *RowGroupReader { r: f.r, sourceSz: f.footerOffset, fileDecryptor: f.fileDecryptor, + bufferPool: &f.bufferPool, } } diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go index 7daefac457bc4..3e45ee915fecf 100755 --- a/go/parquet/file/record_reader.go +++ b/go/parquet/file/record_reader.go @@ -18,6 +18,7 @@ package file import ( "fmt" + "sync" "sync/atomic" "unsafe" @@ -127,9 +128,9 @@ type primitiveRecordReader struct { useValues bool } -func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator) primitiveRecordReader { +func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator, bufferPool *sync.Pool) primitiveRecordReader { return primitiveRecordReader{ - ColumnChunkReader: NewColumnReader(descr, nil, mem), + ColumnChunkReader: NewColumnReader(descr, nil, mem, bufferPool), values: memory.NewResizableBuffer(mem), validBits: memory.NewResizableBuffer(mem), mem: mem, @@ -326,12 +327,12 @@ func (b *binaryRecordReader) GetBuilderChunks() []arrow.Array { return b.recordReaderImpl.(binaryRecordReaderImpl).GetBuilderChunks() } -func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader { +func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader { if mem == nil { mem = memory.DefaultAllocator } - pr := createPrimitiveRecordReader(descr, mem) + pr := createPrimitiveRecordReader(descr, mem, bufferPool) return &recordReader{ refCount: 1, recordReaderImpl: &pr, @@ -722,7 +723,7 @@ func (fr *flbaRecordReader) GetBuilderChunks() []arrow.Array { return []arrow.Array{fr.bldr.NewArray()} } -func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader { +func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader { if mem == nil { mem = memory.DefaultAllocator } @@ -731,7 +732,7 @@ func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Alloca return &binaryRecordReader{&recordReader{ recordReaderImpl: &flbaRecordReader{ - createPrimitiveRecordReader(descr, mem), + createPrimitiveRecordReader(descr, mem, bufferPool), array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: byteWidth}), nil, }, @@ -750,7 +751,7 @@ type byteArrayRecordReader struct { valueBuf []parquet.ByteArray } -func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader { +func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator, bufferPool *sync.Pool) RecordReader { if mem == nil { mem = memory.DefaultAllocator } @@ -762,7 +763,7 @@ func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.A return &binaryRecordReader{&recordReader{ recordReaderImpl: &byteArrayRecordReader{ - createPrimitiveRecordReader(descr, mem), + createPrimitiveRecordReader(descr, mem, bufferPool), array.NewBinaryBuilder(mem, dt), nil, }, @@ -840,13 +841,13 @@ func (br *byteArrayRecordReader) GetBuilderChunks() []arrow.Array { // TODO(mtopol): create optimized readers for dictionary types after ARROW-7286 is done -func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem memory.Allocator) RecordReader { +func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem memory.Allocator, bufferPool *sync.Pool) RecordReader { switch descr.PhysicalType() { case parquet.Types.ByteArray: - return newByteArrayRecordReader(descr, info, mem) + return newByteArrayRecordReader(descr, info, mem, bufferPool) case parquet.Types.FixedLenByteArray: - return newFLBARecordReader(descr, info, mem) + return newFLBARecordReader(descr, info, mem, bufferPool) default: - return newRecordReader(descr, info, mem) + return newRecordReader(descr, info, mem, bufferPool) } } diff --git a/go/parquet/file/row_group_reader.go b/go/parquet/file/row_group_reader.go index 71c71ec38eff5..b2b5bcf15581f 100644 --- a/go/parquet/file/row_group_reader.go +++ b/go/parquet/file/row_group_reader.go @@ -18,6 +18,7 @@ package file import ( "fmt" + "sync" "github.com/apache/arrow/go/v10/internal/utils" "github.com/apache/arrow/go/v10/parquet" @@ -38,6 +39,8 @@ type RowGroupReader struct { rgMetadata *metadata.RowGroupMetaData props *parquet.ReaderProperties fileDecryptor encryption.FileDecryptor + + bufferPool *sync.Pool } // MetaData returns the metadata of the current Row Group @@ -65,7 +68,7 @@ func (r *RowGroupReader) Column(i int) (ColumnChunkReader, error) { if err != nil { return nil, fmt.Errorf("parquet: unable to initialize page reader: %w", err) } - return NewColumnReader(descr, pageRdr, r.props.Allocator()), nil + return NewColumnReader(descr, pageRdr, r.props.Allocator(), r.bufferPool), nil } func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) { diff --git a/go/parquet/pqarrow/column_readers.go b/go/parquet/pqarrow/column_readers.go index b298e2b4c9f33..73577b616ee63 100644 --- a/go/parquet/pqarrow/column_readers.go +++ b/go/parquet/pqarrow/column_readers.go @@ -20,6 +20,7 @@ import ( "encoding/binary" "fmt" "reflect" + "sync" "sync/atomic" "time" "unsafe" @@ -50,13 +51,13 @@ type leafReader struct { refCount int64 } -func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties) (*ColumnReader, error) { +func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, leafInfo file.LevelInfo, props ArrowReadProperties, bufferPool *sync.Pool) (*ColumnReader, error) { ret := &leafReader{ rctx: rctx, field: field, input: input, descr: input.Descr(), - recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type.ID() == arrow.DICTIONARY, rctx.mem), + recordRdr: file.NewRecordReader(input.Descr(), leafInfo, field.Type.ID() == arrow.DICTIONARY, rctx.mem, bufferPool), props: props, refCount: 1, } diff --git a/go/parquet/pqarrow/file_reader.go b/go/parquet/pqarrow/file_reader.go index 7d345d6187f08..f62b4571b8e9e 100755 --- a/go/parquet/pqarrow/file_reader.go +++ b/go/parquet/pqarrow/file_reader.go @@ -210,7 +210,7 @@ func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups // greatly improves performance. // GetFieldReader causes read operations, when issued serially on large numbers of columns, // this is super time consuming. Get field readers concurrently. - g,gctx := errgroup.WithContext(ctx) + g, gctx := errgroup.WithContext(ctx) if !fr.Props.Parallel { g.SetLimit(1) } @@ -482,7 +482,7 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi return nil, nil } - out, err = newLeafReader(&rctx, field.Field, rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props) + out, err = newLeafReader(&rctx, field.Field, rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props, fr.rdr.BufferPool()) return } @@ -499,7 +499,7 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi // When reading structs with large numbers of columns, the serial load is very slow. // This is especially true when reading Cloud Storage. Loading concurrently // greatly improves performance. - g,gctx := errgroup.WithContext(ctx) + g, gctx := errgroup.WithContext(ctx) if !fr.Props.Parallel { g.SetLimit(1) }