From 699d26242e37ea7ecfe25163caa35b7d52e14f3b Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 29 Aug 2023 11:34:22 -0400 Subject: [PATCH 01/10] feat(bigquery): detach storage api iterator from arrow decoding --- bigquery/arrow.go | 63 ++++++++++++------ bigquery/iterator.go | 3 +- bigquery/storage_bench_test.go | 2 +- bigquery/storage_integration_test.go | 12 ++-- bigquery/storage_iterator.go | 95 +++++++++++++++------------- bigquery/storage_iterator_test.go | 2 +- 6 files changed, 105 insertions(+), 72 deletions(-) diff --git a/bigquery/arrow.go b/bigquery/arrow.go index c5120901b530..51c77d476cbc 100644 --- a/bigquery/arrow.go +++ b/bigquery/arrow.go @@ -17,8 +17,8 @@ package bigquery import ( "bytes" "encoding/base64" - "errors" "fmt" + "io" "math/big" "cloud.google.com/go/civil" @@ -27,18 +27,42 @@ import ( "github.com/apache/arrow/go/v12/arrow/ipc" ) -type arrowDecoder struct { - tableSchema Schema - rawArrowSchema []byte - arrowSchema *arrow.Schema +// ArrowRecordBatch represents an Arrow RecordBatch with the source PartitionID +type ArrowRecordBatch struct { + reader io.Reader + // Serialized Arrow Record Batch. + Data []byte + // Serialized Arrow Schema. + Schema []byte + // Source partition ID. In the Storage API world, it represents the ReadStream. + PartitionID string } -func newArrowDecoderFromSession(session *readSession, schema Schema) (*arrowDecoder, error) { - bqSession := session.bqSession - if bqSession == nil { - return nil, errors.New("read session not initialized") +// Read makes ArrowRecordBatch implements io.Reader +func (r *ArrowRecordBatch) Read(p []byte) (int, error) { + if r.reader == nil { + buf := bytes.NewBuffer(r.Schema) + buf.Write(r.Data) + r.reader = buf } - arrowSerializedSchema := bqSession.GetArrowSchema().GetSerializedSchema() + return r.reader.Read(p) +} + +// ArrowIterator represents a way to iterate through a stream of arrow records. +// Experimental: this interface is experimental and may be modified or removed in future versions, +// regardless of any other documented package stability guarantees. +type ArrowIterator interface { + Next() (*ArrowRecordBatch, error) + Schema() Schema + SerializedArrowSchema() []byte +} + +type arrowDecoder struct { + tableSchema Schema + arrowSchema *arrow.Schema +} + +func newArrowDecoder(arrowSerializedSchema []byte, schema Schema) (*arrowDecoder, error) { buf := bytes.NewBuffer(arrowSerializedSchema) r, err := ipc.NewReader(buf) if err != nil { @@ -46,22 +70,19 @@ func newArrowDecoderFromSession(session *readSession, schema Schema) (*arrowDeco } defer r.Release() p := &arrowDecoder{ - tableSchema: schema, - rawArrowSchema: arrowSerializedSchema, - arrowSchema: r.Schema(), + tableSchema: schema, + arrowSchema: r.Schema(), } return p, nil } -func (ap *arrowDecoder) createIPCReaderForBatch(serializedArrowRecordBatch []byte) (*ipc.Reader, error) { - buf := bytes.NewBuffer(ap.rawArrowSchema) - buf.Write(serializedArrowRecordBatch) - return ipc.NewReader(buf, ipc.WithSchema(ap.arrowSchema)) +func (ap *arrowDecoder) createIPCReaderForBatch(arrowRecordBatch *ArrowRecordBatch) (*ipc.Reader, error) { + return ipc.NewReader(arrowRecordBatch, ipc.WithSchema(ap.arrowSchema)) } // decodeArrowRecords decodes BQ ArrowRecordBatch into rows of []Value. -func (ap *arrowDecoder) decodeArrowRecords(serializedArrowRecordBatch []byte) ([][]Value, error) { - r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch) +func (ap *arrowDecoder) decodeArrowRecords(arrowRecordBatch *ArrowRecordBatch) ([][]Value, error) { + r, err := ap.createIPCReaderForBatch(arrowRecordBatch) if err != nil { return nil, err } @@ -79,8 +100,8 @@ func (ap *arrowDecoder) decodeArrowRecords(serializedArrowRecordBatch []byte) ([ } // decodeRetainedArrowRecords decodes BQ ArrowRecordBatch into a list of retained arrow.Record. -func (ap *arrowDecoder) decodeRetainedArrowRecords(serializedArrowRecordBatch []byte) ([]arrow.Record, error) { - r, err := ap.createIPCReaderForBatch(serializedArrowRecordBatch) +func (ap *arrowDecoder) decodeRetainedArrowRecords(arrowRecordBatch *ArrowRecordBatch) ([]arrow.Record, error) { + r, err := ap.createIPCReaderForBatch(arrowRecordBatch) if err != nil { return nil, err } diff --git a/bigquery/iterator.go b/bigquery/iterator.go index b5823e7f4a73..37b12dea84d4 100644 --- a/bigquery/iterator.go +++ b/bigquery/iterator.go @@ -44,7 +44,8 @@ type RowIterator struct { ctx context.Context src *rowSource - arrowIterator *arrowIterator + arrowIterator ArrowIterator + arrowDecoder *arrowDecoder pageInfo *iterator.PageInfo nextFunc func() error diff --git a/bigquery/storage_bench_test.go b/bigquery/storage_bench_test.go index 690a561f02fc..53c9feea5bea 100644 --- a/bigquery/storage_bench_test.go +++ b/bigquery/storage_bench_test.go @@ -74,7 +74,7 @@ func BenchmarkIntegration_StorageReadQuery(b *testing.B) { } } b.ReportMetric(float64(it.TotalRows), "rows") - bqSession := it.arrowIterator.session.bqSession + bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession b.ReportMetric(float64(len(bqSession.Streams)), "parallel_streams") b.ReportMetric(float64(maxStreamCount), "max_streams") } diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index 88924ec758a9..3d99c85059b1 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -250,11 +250,12 @@ func TestIntegration_StorageReadQueryOrdering(t *testing.T) { } total++ // as we read the first value separately - bqSession := it.arrowIterator.session.bqSession + session := it.arrowIterator.(*storageArrowIterator).session + bqSession := session.bqSession if len(bqSession.Streams) == 0 { t.Fatalf("%s: expected to use at least one stream but found %d", tc.name, len(bqSession.Streams)) } - streamSettings := it.arrowIterator.session.settings.maxStreamCount + streamSettings := session.settings.maxStreamCount if tc.maxExpectedStreams > 0 { if streamSettings > tc.maxExpectedStreams { t.Fatalf("%s: expected stream settings to be at most %d streams but found %d", tc.name, tc.maxExpectedStreams, streamSettings) @@ -317,7 +318,7 @@ func TestIntegration_StorageReadQueryStruct(t *testing.T) { total++ } - bqSession := it.arrowIterator.session.bqSession + bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession if len(bqSession.Streams) == 0 { t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams)) } @@ -366,7 +367,7 @@ func TestIntegration_StorageReadQueryMorePages(t *testing.T) { } total++ // as we read the first value separately - bqSession := it.arrowIterator.session.bqSession + bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession if len(bqSession.Streams) == 0 { t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams)) } @@ -418,7 +419,8 @@ func TestIntegration_StorageReadCancel(t *testing.T) { } // resources are cleaned asynchronously time.Sleep(time.Second) - if !it.arrowIterator.isDone() { + arrowIt := it.arrowIterator.(*storageArrowIterator) + if !arrowIt.isDone() { t.Fatal("expected stream to be done") } } diff --git a/bigquery/storage_iterator.go b/bigquery/storage_iterator.go index c0ef9fedfb37..cacd1c7858eb 100644 --- a/bigquery/storage_iterator.go +++ b/bigquery/storage_iterator.go @@ -32,20 +32,21 @@ import ( "google.golang.org/grpc/status" ) -// arrowIterator is a raw interface for getting data from Storage Read API -type arrowIterator struct { - done uint32 // atomic flag - errs chan error - ctx context.Context +// storageArrowIterator is a raw interface for getting data from Storage Read API +type storageArrowIterator struct { + done uint32 // atomic flag + initialized bool + errs chan error + ctx context.Context - schema Schema - decoder *arrowDecoder - records chan arrowRecordBatch + schema Schema + rawSchema []byte + records chan *ArrowRecordBatch session *readSession } -type arrowRecordBatch []byte +var _ ArrowIterator = &storageArrowIterator{} func newStorageRowIteratorFromTable(ctx context.Context, table *Table, ordered bool) (*RowIterator, error) { md, err := table.Metadata(ctx) @@ -56,11 +57,19 @@ func newStorageRowIteratorFromTable(ctx context.Context, table *Table, ordered b if err != nil { return nil, err } - it, err := newStorageRowIterator(rs) + it, err := newStorageRowIterator(rs, md.Schema) if err != nil { return nil, err } - it.arrowIterator.schema = md.Schema + if rs.bqSession == nil { + return nil, errors.New("read session not initialized") + } + arrowSerializedSchema := rs.bqSession.GetArrowSchema().GetSerializedSchema() + dec, err := newArrowDecoder(arrowSerializedSchema, md.Schema) + if err != nil { + return nil, err + } + it.arrowDecoder = dec it.Schema = md.Schema return it, nil } @@ -112,11 +121,12 @@ func resolveLastChildSelectJob(ctx context.Context, job *Job) (*Job, error) { return childJobs[0], nil } -func newRawStorageRowIterator(rs *readSession) (*arrowIterator, error) { - arrowIt := &arrowIterator{ +func newRawStorageRowIterator(rs *readSession, schema Schema) (*storageArrowIterator, error) { + arrowIt := &storageArrowIterator{ ctx: rs.ctx, session: rs, - records: make(chan arrowRecordBatch, rs.settings.maxWorkerCount+1), + schema: schema, + records: make(chan *ArrowRecordBatch, rs.settings.maxWorkerCount+1), errs: make(chan error, rs.settings.maxWorkerCount+1), } if rs.bqSession == nil { @@ -125,11 +135,12 @@ func newRawStorageRowIterator(rs *readSession) (*arrowIterator, error) { return nil, err } } + arrowIt.rawSchema = rs.bqSession.GetArrowSchema().GetSerializedSchema() return arrowIt, nil } -func newStorageRowIterator(rs *readSession) (*RowIterator, error) { - arrowIt, err := newRawStorageRowIterator(rs) +func newStorageRowIterator(rs *readSession, schema Schema) (*RowIterator, error) { + arrowIt, err := newRawStorageRowIterator(rs, schema) if err != nil { return nil, err } @@ -153,8 +164,7 @@ func nextFuncForStorageIterator(it *RowIterator) func() error { if len(it.rows) > 0 { return nil } - arrowIt := it.arrowIterator - record, err := arrowIt.next() + record, err := it.arrowIterator.Next() if err == iterator.Done { if len(it.rows) == 0 { return iterator.Done @@ -165,9 +175,9 @@ func nextFuncForStorageIterator(it *RowIterator) func() error { return err } if it.Schema == nil { - it.Schema = it.arrowIterator.schema + it.Schema = it.arrowIterator.Schema() } - rows, err := arrowIt.decoder.decodeArrowRecords(record) + rows, err := it.arrowDecoder.decodeArrowRecords(record) if err != nil { return err } @@ -176,8 +186,8 @@ func nextFuncForStorageIterator(it *RowIterator) func() error { } } -func (it *arrowIterator) init() error { - if it.decoder != nil { // Already initialized +func (it *storageArrowIterator) init() error { + if it.initialized { return nil } @@ -191,20 +201,6 @@ func (it *arrowIterator) init() error { return iterator.Done } - if it.schema == nil { - meta, err := it.session.table.Metadata(it.ctx) - if err != nil { - return err - } - it.schema = meta.Schema - } - - decoder, err := newArrowDecoderFromSession(it.session, it.schema) - if err != nil { - return err - } - it.decoder = decoder - wg := sync.WaitGroup{} wg.Add(len(streams)) sem := semaphore.NewWeighted(int64(it.session.settings.maxWorkerCount)) @@ -229,18 +225,19 @@ func (it *arrowIterator) init() error { }(readStream.Name) } }() + it.initialized = true return nil } -func (it *arrowIterator) markDone() { +func (it *storageArrowIterator) markDone() { atomic.StoreUint32(&it.done, 1) } -func (it *arrowIterator) isDone() bool { +func (it *storageArrowIterator) isDone() bool { return atomic.LoadUint32(&it.done) != 0 } -func (it *arrowIterator) processStream(readStream string) { +func (it *storageArrowIterator) processStream(readStream string) { bo := gax.Backoff{} var offset int64 for { @@ -292,7 +289,7 @@ func retryReadRows(bo gax.Backoff, err error) (time.Duration, bool) { return bo.Pause(), false } -func (it *arrowIterator) consumeRowStream(readStream string, rowStream storagepb.BigQueryRead_ReadRowsClient, offset int64) (int64, error) { +func (it *storageArrowIterator) consumeRowStream(readStream string, rowStream storagepb.BigQueryRead_ReadRowsClient, offset int64) (int64, error) { for { r, err := rowStream.Recv() if err != nil { @@ -303,8 +300,12 @@ func (it *arrowIterator) consumeRowStream(readStream string, rowStream storagepb } if r.RowCount > 0 { offset += r.RowCount - arrowRecordBatch := r.GetArrowRecordBatch() - it.records <- arrowRecordBatch.SerializedRecordBatch + recordBatch := r.GetArrowRecordBatch() + it.records <- &ArrowRecordBatch{ + PartitionID: readStream, + Schema: it.rawSchema, + Data: recordBatch.SerializedRecordBatch, + } } } } @@ -312,7 +313,7 @@ func (it *arrowIterator) consumeRowStream(readStream string, rowStream storagepb // next return the next batch of rows as an arrow.Record. // Accessing Arrow Records directly has the drawnback of having to deal // with memory management. -func (it *arrowIterator) next() (arrowRecordBatch, error) { +func (it *storageArrowIterator) Next() (*ArrowRecordBatch, error) { if err := it.init(); err != nil { return nil, err } @@ -335,6 +336,14 @@ func (it *arrowIterator) next() (arrowRecordBatch, error) { } } +func (it *storageArrowIterator) SerializedArrowSchema() []byte { + return it.rawSchema +} + +func (it *storageArrowIterator) Schema() Schema { + return it.schema +} + // IsAccelerated check if the current RowIterator is // being accelerated by Storage API. func (it *RowIterator) IsAccelerated() bool { diff --git a/bigquery/storage_iterator_test.go b/bigquery/storage_iterator_test.go index 03293af069bc..f405e8a998fc 100644 --- a/bigquery/storage_iterator_test.go +++ b/bigquery/storage_iterator_test.go @@ -94,7 +94,7 @@ func TestStorageIteratorRetry(t *testing.T) { return &testReadRowsClient{}, nil }, bqSession: &storagepb.ReadSession{}, - }) + }, Schema{}) if err != nil { t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err) } From 8ad2672badf9a4582c8ba4bdcc62a6ac0ddec494 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 29 Aug 2023 11:35:03 -0400 Subject: [PATCH 02/10] feat(bigquery): make ArrowIterator public and add integration tests --- bigquery/storage_integration_test.go | 106 +++++++++++++++++++++++++++ bigquery/storage_iterator.go | 12 +++ 2 files changed, 118 insertions(+) diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index 3d99c85059b1..26c124b463a0 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -15,13 +15,19 @@ package bigquery import ( + "bytes" "context" "errors" "fmt" + "io" "testing" "time" "cloud.google.com/go/internal/testutil" + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/ipc" + "github.com/apache/arrow/go/v12/arrow/math" "github.com/google/go-cmp/cmp" "google.golang.org/api/iterator" ) @@ -425,6 +431,106 @@ func TestIntegration_StorageReadCancel(t *testing.T) { } } +func TestIntegration_StorageReadArrow(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + table := "`bigquery-public-data.usa_names.usa_1910_current`" + sql := fmt.Sprintf(`SELECT name, number, state FROM %s where state = "CA"`, table) + + q := storageOptimizedClient.Query(sql) + job, err := q.Run(ctx) // force usage of Storage API by skipping fast paths + if err != nil { + t.Fatal(err) + } + it, err := job.Read(ctx) + if err != nil { + t.Fatal(err) + } + arrowIt, err := it.ArrowIterator() + if err != nil { + t.Fatalf("expected iterator to be accelerated: %v", err) + } + var arrowSchema *arrow.Schema + records := []arrow.Record{} + r, err := ipc.NewReader(&arrowIteratorReader{ + it: arrowIt, + }) + numrec := 0 + for r.Next() { + rec := r.Record() + rec.Retain() + records = append(records, rec) + numrec += int(rec.NumRows()) + } + arrowSchema = r.Schema() + r.Release() + t.Logf("decoded %d records", numrec) + + if arrowSchema == nil { + t.Fatal("should have Arrow table available, but nil found") + } + t.Logf("decoded total of %d record batches", len(records)) + arrowTable := array.NewTableFromRecords(arrowSchema, records) + defer arrowTable.Release() + if arrowTable.NumRows() != int64(it.TotalRows) { + t.Fatalf("should have a table with %d rows, but found %d", it.TotalRows, arrowTable.NumRows()) + } + if arrowTable.NumCols() != 3 { + t.Fatalf("should have a table with 3 columns, but found %d", arrowTable.NumCols()) + } + + sumSQL := fmt.Sprintf(`SELECT sum(number) as total FROM %s where state = "CA"`, table) + sumQuery := client.Query(sumSQL) + sumIt, err := sumQuery.Read(ctx) + if err != nil { + t.Fatal(err) + } + sumValues := []Value{} + err = sumIt.Next(&sumValues) + if err != nil { + t.Fatal(err) + } + totalFromSQL := sumValues[0].(int64) + + tr := array.NewTableReader(arrowTable, arrowTable.NumRows()) + var totalFromArrow int64 + for tr.Next() { + rec := tr.Record() + vec := array.NewInt64Data(rec.Column(1).Data()) + totalFromArrow += math.Int64.Sum(vec) + } + if totalFromArrow != totalFromSQL { + t.Fatalf("expected total to be %d, but with arrow we got %d", totalFromSQL, totalFromArrow) + } +} + +// Helper struct that implements io.Reader to read +// full ArrowIterator with ipc.Reader +type arrowIteratorReader struct { + buf *bytes.Buffer + it ArrowIterator +} + +// Read makes arrowIteratorReader implement io.Reader +func (r *arrowIteratorReader) Read(p []byte) (int, error) { + if r.buf == nil { // init with schema + buf := bytes.NewBuffer(r.it.SerializedArrowSchema()) + r.buf = buf + } + n, err := r.buf.Read(p) + if err == io.EOF { + batch, err := r.it.Next() + if err == iterator.Done { + return -1, io.EOF + } + r.buf.Write(batch.Data) + return r.Read(p) + } + return n, err +} + func countIteratorRows(it *RowIterator) (total uint64, err error) { for { var dst []Value diff --git a/bigquery/storage_iterator.go b/bigquery/storage_iterator.go index cacd1c7858eb..853f01262762 100644 --- a/bigquery/storage_iterator.go +++ b/bigquery/storage_iterator.go @@ -349,3 +349,15 @@ func (it *storageArrowIterator) Schema() Schema { func (it *RowIterator) IsAccelerated() bool { return it.arrowIterator != nil } + +// ArrowIterator gives access to the raw Arrow Record Batch stream to be consumed directly. +// Experimental: this interface is experimental and may be modified or removed in future versions, +// regardless of any other documented package stability guarantees. +// Don't try to mix RowIterator.Next and ArrowIterator.Next calls. +func (it *RowIterator) ArrowIterator() (ArrowIterator, error) { + if !it.IsAccelerated() { + // TODO: can we convert plain RowIterator based on JSON API to an Arrow Stream ? + return nil, errors.New("bigquery: require storage read API to be enabled") + } + return it.arrowIterator, nil +} From 6969f677a2fd1352637e679f1eeb6201e807809f Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 29 Aug 2023 14:04:40 -0400 Subject: [PATCH 03/10] chore(bigquery): upgrade arrow to v13 --- bigquery/arrow.go | 6 ++-- bigquery/go.mod | 18 ++++------- bigquery/go.sum | 45 ++++++++++------------------ bigquery/storage_integration_test.go | 8 ++--- 4 files changed, 28 insertions(+), 49 deletions(-) diff --git a/bigquery/arrow.go b/bigquery/arrow.go index 51c77d476cbc..08a2b587f8f9 100644 --- a/bigquery/arrow.go +++ b/bigquery/arrow.go @@ -22,9 +22,9 @@ import ( "math/big" "cloud.google.com/go/civil" - "github.com/apache/arrow/go/v12/arrow" - "github.com/apache/arrow/go/v12/arrow/array" - "github.com/apache/arrow/go/v12/arrow/ipc" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/ipc" ) // ArrowRecordBatch represents an Arrow RecordBatch with the source PartitionID diff --git a/bigquery/go.mod b/bigquery/go.mod index 802184dbd941..e8534c983ea1 100644 --- a/bigquery/go.mod +++ b/bigquery/go.mod @@ -7,7 +7,7 @@ require ( cloud.google.com/go/datacatalog v1.14.0 cloud.google.com/go/iam v1.1.0 cloud.google.com/go/storage v1.30.1 - github.com/apache/arrow/go/v12 v12.0.0 + github.com/apache/arrow/go/v13 v13.0.0 github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.0 github.com/googleapis/gax-go/v2 v2.12.0 @@ -26,22 +26,16 @@ require ( cloud.google.com/go/compute v1.19.3 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/longrunning v0.4.2 // indirect - github.com/andybalholm/brotli v1.0.4 // indirect - github.com/apache/thrift v0.16.0 // indirect - github.com/goccy/go-json v0.9.11 // indirect + github.com/goccy/go-json v0.10.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/golang/snappy v0.0.4 // indirect - github.com/google/flatbuffers v2.0.8+incompatible // indirect + github.com/google/flatbuffers v23.1.21+incompatible // indirect github.com/google/martian/v3 v3.3.2 // indirect github.com/google/s2a-go v0.1.4 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect - github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.15.9 // indirect - github.com/klauspost/cpuid/v2 v2.0.9 // indirect - github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect - github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect - github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/klauspost/compress v1.15.15 // indirect + github.com/klauspost/cpuid/v2 v2.2.3 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/stretchr/testify v1.8.2 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect golang.org/x/crypto v0.9.0 // indirect diff --git a/bigquery/go.sum b/bigquery/go.sum index 124fa5a130b8..5a0ef3177bdd 100644 --- a/bigquery/go.sum +++ b/bigquery/go.sum @@ -15,14 +15,9 @@ cloud.google.com/go/longrunning v0.4.2/go.mod h1:OHrnaYyLUV6oqwh0xiS7e5sLQhP1m0Q cloud.google.com/go/storage v1.30.1 h1:uOdMxAs8HExqBlnLtnQyP0YkvbiDpdGShGKtx6U/oNM= cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7BiccwkR7+P7gN8E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= -github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= -github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/apache/arrow/go/v12 v12.0.0 h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2OK5cmc= -github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg= -github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= -github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= +github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk= +github.com/apache/arrow/go/v13 v13.0.0/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -43,14 +38,13 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= -github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= -github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= +github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -69,9 +63,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM= -github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v23.1.21+incompatible h1:bUqzx/MXCDxuS0hRJL2EfjyZL3uQrPbMocUa8zGqsTA= +github.com/google/flatbuffers v23.1.21+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -93,18 +86,12 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5 github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= -github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= -github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= -github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= -github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= -github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= -github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= -github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= -github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= +github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= +github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU= +github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -127,18 +114,16 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= +golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -178,6 +163,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -201,12 +187,11 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo= golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= +gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= google.golang.org/api v0.128.0 h1:RjPESny5CnQRn9V6siglged+DZCgfu9l6mO9dkX9VOg= google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index 26c124b463a0..9e00bc569e0b 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -24,10 +24,10 @@ import ( "time" "cloud.google.com/go/internal/testutil" - "github.com/apache/arrow/go/v12/arrow" - "github.com/apache/arrow/go/v12/arrow/array" - "github.com/apache/arrow/go/v12/arrow/ipc" - "github.com/apache/arrow/go/v12/arrow/math" + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/ipc" + "github.com/apache/arrow/go/v13/arrow/math" "github.com/google/go-cmp/cmp" "google.golang.org/api/iterator" ) From 6765573bbf3cba27c3b477bb49147655906b6ac7 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 11 Sep 2023 09:31:34 -0400 Subject: [PATCH 04/10] feat(bigquery): add ArrowIteratorReader --- bigquery/arrow.go | 33 ++++++++++++++++++++++ bigquery/storage_integration_test.go | 42 ++++------------------------ 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/bigquery/arrow.go b/bigquery/arrow.go index 08a2b587f8f9..3f105d2e4c68 100644 --- a/bigquery/arrow.go +++ b/bigquery/arrow.go @@ -25,6 +25,7 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/ipc" + "google.golang.org/api/iterator" ) // ArrowRecordBatch represents an Arrow RecordBatch with the source PartitionID @@ -57,6 +58,38 @@ type ArrowIterator interface { SerializedArrowSchema() []byte } +// NewArrowIteratorReader allows to consume an ArrowIterator as an io.Reader. +// Experimental: this interface is experimental and may be modified or removed in future versions, +// regardless of any other documented package stability guarantees. +func NewArrowIteratorReader(it ArrowIterator) io.Reader { + return &arrowIteratorReader{ + it: it, + } +} + +type arrowIteratorReader struct { + buf *bytes.Buffer + it ArrowIterator +} + +// Read makes ArrowIteratorReader implement io.Reader +func (r *arrowIteratorReader) Read(p []byte) (int, error) { + if r.buf == nil { // init with schema + buf := bytes.NewBuffer(r.it.SerializedArrowSchema()) + r.buf = buf + } + n, err := r.buf.Read(p) + if err == io.EOF { + batch, err := r.it.Next() + if err == iterator.Done { + return -1, io.EOF + } + r.buf.Write(batch.Data) + return r.Read(p) + } + return n, err +} + type arrowDecoder struct { tableSchema Schema arrowSchema *arrow.Schema diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index 9e00bc569e0b..8665bf4c34ef 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -15,11 +15,9 @@ package bigquery import ( - "bytes" "context" "errors" "fmt" - "io" "testing" "time" @@ -448,15 +446,15 @@ func TestIntegration_StorageReadArrow(t *testing.T) { if err != nil { t.Fatal(err) } + arrowIt, err := it.ArrowIterator() if err != nil { t.Fatalf("expected iterator to be accelerated: %v", err) } - var arrowSchema *arrow.Schema + arrowItReader := NewArrowIteratorReader(arrowIt) + records := []arrow.Record{} - r, err := ipc.NewReader(&arrowIteratorReader{ - it: arrowIt, - }) + r, err := ipc.NewReader(arrowItReader) numrec := 0 for r.Next() { rec := r.Record() @@ -464,14 +462,9 @@ func TestIntegration_StorageReadArrow(t *testing.T) { records = append(records, rec) numrec += int(rec.NumRows()) } - arrowSchema = r.Schema() r.Release() - t.Logf("decoded %d records", numrec) - if arrowSchema == nil { - t.Fatal("should have Arrow table available, but nil found") - } - t.Logf("decoded total of %d record batches", len(records)) + arrowSchema := r.Schema() arrowTable := array.NewTableFromRecords(arrowSchema, records) defer arrowTable.Release() if arrowTable.NumRows() != int64(it.TotalRows) { @@ -506,31 +499,6 @@ func TestIntegration_StorageReadArrow(t *testing.T) { } } -// Helper struct that implements io.Reader to read -// full ArrowIterator with ipc.Reader -type arrowIteratorReader struct { - buf *bytes.Buffer - it ArrowIterator -} - -// Read makes arrowIteratorReader implement io.Reader -func (r *arrowIteratorReader) Read(p []byte) (int, error) { - if r.buf == nil { // init with schema - buf := bytes.NewBuffer(r.it.SerializedArrowSchema()) - r.buf = buf - } - n, err := r.buf.Read(p) - if err == io.EOF { - batch, err := r.it.Next() - if err == iterator.Done { - return -1, io.EOF - } - r.buf.Write(batch.Data) - return r.Read(p) - } - return n, err -} - func countIteratorRows(it *RowIterator) (total uint64, err error) { for { var dst []Value From 87fb373e6d573fdc8aa3994a13134eb13e6f7d04 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 5 Oct 2023 13:32:59 -0400 Subject: [PATCH 05/10] fix(bigquery): return err if ArrowIterator is nil --- bigquery/arrow.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bigquery/arrow.go b/bigquery/arrow.go index 3f105d2e4c68..457790e57cfd 100644 --- a/bigquery/arrow.go +++ b/bigquery/arrow.go @@ -17,6 +17,7 @@ package bigquery import ( "bytes" "encoding/base64" + "errors" "fmt" "io" "math/big" @@ -74,6 +75,9 @@ type arrowIteratorReader struct { // Read makes ArrowIteratorReader implement io.Reader func (r *arrowIteratorReader) Read(p []byte) (int, error) { + if r.it == nil { + return -1, errors.New("bigquery: nil ArrowIterator") + } if r.buf == nil { // init with schema buf := bytes.NewBuffer(r.it.SerializedArrowSchema()) r.buf = buf From aee80186b0533618fb65de67fd4d9d6a30b57e11 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 9 Oct 2023 14:23:09 -0400 Subject: [PATCH 06/10] chore: rollback arrow version to upgrade it separately --- bigquery/arrow.go | 6 +++--- bigquery/go.mod | 8 +++++++- bigquery/go.sum | 24 ++++++++++++++++++++---- bigquery/storage_integration_test.go | 8 ++++---- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/bigquery/arrow.go b/bigquery/arrow.go index 457790e57cfd..9c4a99ddac80 100644 --- a/bigquery/arrow.go +++ b/bigquery/arrow.go @@ -23,9 +23,9 @@ import ( "math/big" "cloud.google.com/go/civil" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/ipc" + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/ipc" "google.golang.org/api/iterator" ) diff --git a/bigquery/go.mod b/bigquery/go.mod index e8534c983ea1..97faf23fab00 100644 --- a/bigquery/go.mod +++ b/bigquery/go.mod @@ -7,7 +7,7 @@ require ( cloud.google.com/go/datacatalog v1.14.0 cloud.google.com/go/iam v1.1.0 cloud.google.com/go/storage v1.30.1 - github.com/apache/arrow/go/v13 v13.0.0 + github.com/apache/arrow/go/v12 v12.0.0 github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.0 github.com/googleapis/gax-go/v2 v2.12.0 @@ -26,15 +26,21 @@ require ( cloud.google.com/go/compute v1.19.3 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/longrunning v0.4.2 // indirect + github.com/andybalholm/brotli v1.0.4 // indirect + github.com/apache/thrift v0.16.0 // indirect github.com/goccy/go-json v0.10.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v23.1.21+incompatible // indirect github.com/google/martian/v3 v3.3.2 // indirect github.com/google/s2a-go v0.1.4 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.15.15 // indirect github.com/klauspost/cpuid/v2 v2.2.3 // indirect + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/stretchr/testify v1.8.2 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect diff --git a/bigquery/go.sum b/bigquery/go.sum index 5a0ef3177bdd..e77bf7aeace6 100644 --- a/bigquery/go.sum +++ b/bigquery/go.sum @@ -15,9 +15,14 @@ cloud.google.com/go/longrunning v0.4.2/go.mod h1:OHrnaYyLUV6oqwh0xiS7e5sLQhP1m0Q cloud.google.com/go/storage v1.30.1 h1:uOdMxAs8HExqBlnLtnQyP0YkvbiDpdGShGKtx6U/oNM= cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7BiccwkR7+P7gN8E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk= -github.com/apache/arrow/go/v13 v13.0.0/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= +github.com/apache/arrow/go/v12 v12.0.0 h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2OK5cmc= +github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg= +github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= +github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -45,6 +50,7 @@ github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4er github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -63,6 +69,7 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v23.1.21+incompatible h1:bUqzx/MXCDxuS0hRJL2EfjyZL3uQrPbMocUa8zGqsTA= github.com/google/flatbuffers v23.1.21+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -86,10 +93,16 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5 github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -114,16 +127,18 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= +golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -187,11 +202,12 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo= golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= google.golang.org/api v0.128.0 h1:RjPESny5CnQRn9V6siglged+DZCgfu9l6mO9dkX9VOg= google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index 8665bf4c34ef..0771d889f144 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -22,10 +22,10 @@ import ( "time" "cloud.google.com/go/internal/testutil" - "github.com/apache/arrow/go/v13/arrow" - "github.com/apache/arrow/go/v13/arrow/array" - "github.com/apache/arrow/go/v13/arrow/ipc" - "github.com/apache/arrow/go/v13/arrow/math" + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/ipc" + "github.com/apache/arrow/go/v12/arrow/math" "github.com/google/go-cmp/cmp" "google.golang.org/api/iterator" ) From d142c53d29d529e0b7d6899eb59c4ac4c83e97d9 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 9 Oct 2023 14:25:53 -0400 Subject: [PATCH 07/10] chore: rollback other lib upgrade to upgrade them separately --- bigquery/go.mod | 10 +++++----- bigquery/go.sum | 21 ++++++++++----------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/bigquery/go.mod b/bigquery/go.mod index 97faf23fab00..802184dbd941 100644 --- a/bigquery/go.mod +++ b/bigquery/go.mod @@ -28,20 +28,20 @@ require ( cloud.google.com/go/longrunning v0.4.2 // indirect github.com/andybalholm/brotli v1.0.4 // indirect github.com/apache/thrift v0.16.0 // indirect - github.com/goccy/go-json v0.10.0 // indirect + github.com/goccy/go-json v0.9.11 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/flatbuffers v23.1.21+incompatible // indirect + github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/google/martian/v3 v3.3.2 // indirect github.com/google/s2a-go v0.1.4 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.15.15 // indirect - github.com/klauspost/cpuid/v2 v2.2.3 // indirect + github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/stretchr/testify v1.8.2 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect golang.org/x/crypto v0.9.0 // indirect diff --git a/bigquery/go.sum b/bigquery/go.sum index e77bf7aeace6..124fa5a130b8 100644 --- a/bigquery/go.sum +++ b/bigquery/go.sum @@ -43,8 +43,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= -github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= -github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= +github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= @@ -70,8 +70,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/flatbuffers v23.1.21+incompatible h1:bUqzx/MXCDxuS0hRJL2EfjyZL3uQrPbMocUa8zGqsTA= -github.com/google/flatbuffers v23.1.21+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM= +github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -95,16 +95,16 @@ github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qK github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= -github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= -github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= -github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU= -github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= -github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= -github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -178,7 +178,6 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= From 0762adc1f8a343b46e9946aba30c2eb292814dee Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 10 Oct 2023 12:28:47 -0400 Subject: [PATCH 08/10] fix: return 0 and io.EOF when iterator is done --- bigquery/arrow.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/arrow.go b/bigquery/arrow.go index 9c4a99ddac80..96e7a3ab4978 100644 --- a/bigquery/arrow.go +++ b/bigquery/arrow.go @@ -86,7 +86,7 @@ func (r *arrowIteratorReader) Read(p []byte) (int, error) { if err == io.EOF { batch, err := r.it.Next() if err == iterator.Done { - return -1, io.EOF + return 0, io.EOF } r.buf.Write(batch.Data) return r.Read(p) From c08080ed5c54936390c4409bcf68ed1c0f557765 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 10 Oct 2023 12:40:49 -0400 Subject: [PATCH 09/10] fix: release arrow records and avoid extra alloc of array.Int64 --- bigquery/storage_integration_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index 0771d889f144..c6b318af5d76 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -459,6 +459,7 @@ func TestIntegration_StorageReadArrow(t *testing.T) { for r.Next() { rec := r.Record() rec.Retain() + defer rec.Release() records = append(records, rec) numrec += int(rec.NumRows()) } @@ -491,7 +492,7 @@ func TestIntegration_StorageReadArrow(t *testing.T) { var totalFromArrow int64 for tr.Next() { rec := tr.Record() - vec := array.NewInt64Data(rec.Column(1).Data()) + vec := rec.Column(1).(*array.Int64) totalFromArrow += math.Int64.Sum(vec) } if totalFromArrow != totalFromSQL { From 3c50652d29a2cdb730121fbea4a2711039ea54f0 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 12 Oct 2023 11:33:31 -0400 Subject: [PATCH 10/10] test: add check allocator to check for memory leaks when retaining arrow structures --- bigquery/arrow.go | 9 ++++++++- bigquery/storage_integration_test.go | 8 +++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/bigquery/arrow.go b/bigquery/arrow.go index 96e7a3ab4978..bd7f0c3ebf26 100644 --- a/bigquery/arrow.go +++ b/bigquery/arrow.go @@ -26,6 +26,7 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" "github.com/apache/arrow/go/v12/arrow/ipc" + "github.com/apache/arrow/go/v12/arrow/memory" "google.golang.org/api/iterator" ) @@ -95,6 +96,7 @@ func (r *arrowIteratorReader) Read(p []byte) (int, error) { } type arrowDecoder struct { + allocator memory.Allocator tableSchema Schema arrowSchema *arrow.Schema } @@ -109,12 +111,17 @@ func newArrowDecoder(arrowSerializedSchema []byte, schema Schema) (*arrowDecoder p := &arrowDecoder{ tableSchema: schema, arrowSchema: r.Schema(), + allocator: memory.DefaultAllocator, } return p, nil } func (ap *arrowDecoder) createIPCReaderForBatch(arrowRecordBatch *ArrowRecordBatch) (*ipc.Reader, error) { - return ipc.NewReader(arrowRecordBatch, ipc.WithSchema(ap.arrowSchema)) + return ipc.NewReader( + arrowRecordBatch, + ipc.WithSchema(ap.arrowSchema), + ipc.WithAllocator(ap.allocator), + ) } // decodeArrowRecords decodes BQ ArrowRecordBatch into rows of []Value. diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index c6b318af5d76..ed4114f56d0a 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -26,6 +26,7 @@ import ( "github.com/apache/arrow/go/v12/arrow/array" "github.com/apache/arrow/go/v12/arrow/ipc" "github.com/apache/arrow/go/v12/arrow/math" + "github.com/apache/arrow/go/v12/arrow/memory" "github.com/google/go-cmp/cmp" "google.golang.org/api/iterator" ) @@ -447,6 +448,10 @@ func TestIntegration_StorageReadArrow(t *testing.T) { t.Fatal(err) } + checkedAllocator := memory.NewCheckedAllocator(memory.DefaultAllocator) + it.arrowDecoder.allocator = checkedAllocator + defer checkedAllocator.AssertSize(t, 0) + arrowIt, err := it.ArrowIterator() if err != nil { t.Fatalf("expected iterator to be accelerated: %v", err) @@ -454,7 +459,7 @@ func TestIntegration_StorageReadArrow(t *testing.T) { arrowItReader := NewArrowIteratorReader(arrowIt) records := []arrow.Record{} - r, err := ipc.NewReader(arrowItReader) + r, err := ipc.NewReader(arrowItReader, ipc.WithAllocator(checkedAllocator)) numrec := 0 for r.Next() { rec := r.Record() @@ -489,6 +494,7 @@ func TestIntegration_StorageReadArrow(t *testing.T) { totalFromSQL := sumValues[0].(int64) tr := array.NewTableReader(arrowTable, arrowTable.NumRows()) + defer tr.Release() var totalFromArrow int64 for tr.Next() { rec := tr.Record()