Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery connector changes #3001

Merged
merged 19 commits into from
Sep 6, 2023
Merged

Bigquery connector changes #3001

merged 19 commits into from
Sep 6, 2023

Conversation

k-anshul
Copy link
Member

@k-anshul k-anshul commented Aug 30, 2023

This PR makes following changes in the existing logic of bigquery connector:

  1. Creates local temporary parquet files from arrow records obtained from bigquery SDK. This logic is currently implemented in a fork of bigquery SDK with changes from a bigquery draft PR : feat(bigquery): expose Apache Arrow data through ArrowIterator  googleapis/google-cloud-go#8506 Migrate to the original SDK once the PR is merged.
  2. Parquet files are then ingested to duckdb.

For cases when bigquery SDK does not return arrow records we will dump the records in json format.

Changes as compared to previous impl :

  1. BigNumeric will not be directly supported. Users can either cast to varchar or to NUMERIC in SQL query if loss of precision is acceptable.
  2. Repeated and Nested types are now supported.
  3. Hopefully faster than previous approach for bigger datasets :)

@k-anshul k-anshul changed the title Arrow Bigquery connector v2 Aug 30, 2023
@k-anshul k-anshul changed the title Bigquery connector v2 Bigquery connector changes Aug 30, 2023
@k-anshul k-anshul marked this pull request as ready for review August 30, 2023 10:02
@k-anshul
Copy link
Member Author

Sample performance results:
Query : SELECT * FROM bigquery-public-data.covid19_open_data.compatibility_view LIMIT 10000000
Old Approach : 135 seconds
New Approach: 60 seconds

go.mod Show resolved Hide resolved
runtime/drivers/bigquery/sql_store.go Outdated Show resolved Hide resolved
Comment on lines +186 to +187
writer.Close()
fw.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are already deferred calls to these – is it safe/necessary to call twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. writer.Close() will return an error which we are ignoring and fw.Close() will not do anything. There are many return paths in this code so I used a defer as well to close there in error cases.

runtime/pkg/fileutil/fileutil.go Outdated Show resolved Hide resolved
runtime/drivers/duckdb/transporter/sqlstore_to_duckDB.go Outdated Show resolved Hide resolved
runtime/drivers/bigquery/arrow.go Outdated Show resolved Hide resolved
Comment on lines 100 to 154
// Next returns true if another record can be produced
func (rs *arrowRecordReader) Next() bool {
if rs.err != nil {
return false
}

if len(rs.records) == 0 {
tz := time.Now()
next, err := rs.bqIter.Next()
if err != nil {
rs.err = err
return false
}
rs.apinext += time.Since(tz)

rs.records, rs.err = rs.nextArrowRecords(next)
if rs.err != nil {
return false
}
}
if rs.cur != nil {
rs.cur.Release()
}
rs.cur = rs.records[0]
rs.records = rs.records[1:]
return true
}

func (rs *arrowRecordReader) Err() error {
if errors.Is(rs.err, iterator.Done) {
return nil
}
return rs.err
}

func (rs *arrowRecordReader) nextArrowRecords(r *bigquery.ArrowRecordBatch) ([]arrow.Record, error) {
t := time.Now()
defer func() {
rs.ipcread += time.Since(t)
}()

buf := bytes.NewBuffer(rs.bqIter.SerializedArrowSchema())
buf.Write(r.Data)
rdr, err := ipc.NewReader(buf, ipc.WithSchema(rs.arrowSchema), ipc.WithAllocator(rs.allocator))
if err != nil {
return nil, err
}
defer rdr.Release()
records := make([]arrow.Record, 0)
for rdr.Next() {
rec := rdr.Record()
rec.Retain()
records = append(records, rec)
}
return records, rdr.Err()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the extra layer of buffering in rs.records needed versus directly buffering rdr and proxying to it in Next()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried proxying rdr to next but overall it feels lot more complicated than current implementation. There are multiple cases to consider. rdr will be nil in first next call, rdr can return next as false if is are no more data or if there is some genuine error(which needs to be handled separately as well). Overall it felt lot easier to just look at the size of the array.
Also as of now ArrowRecordBatch from bigquery will only return one arrow record but it's better to keep array for any change in underlying impls in future.

@k-anshul k-anshul merged commit ccd6fd2 into main Sep 6, 2023
@k-anshul k-anshul deleted the arrow branch September 6, 2023 09:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants