Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): expose Apache Arrow data through ArrowIterator #8506

Merged
merged 13 commits into from
Oct 23, 2023

Conversation

alvarowolfx
Copy link
Contributor

As we have some planned work to support Arrow data fetching on other query APIs, so we need to think of an interface that will support all of those query paths and also work as a base for other Arrow projects like ADBC. So this PR detaches the Storage API from the Arrow Decoder and creates a new ArrowIterator interface. This new interface is implemented by the Storage iterator and later can be implemented for other query interfaces that supports Arrow.

Resolves #8100

@product-auto-label product-auto-label bot added size: l Pull request size is large. api: bigquery Issues related to the BigQuery API. labels Aug 29, 2023
t.Fatal("expected stream to be done")
}
}

func TestIntegration_StorageReadArrow(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@k-anshul @zeroshade this integration test show an example on how that interface would be used.

@alvarowolfx alvarowolfx requested a review from shollyman August 29, 2023 15:47
@alvarowolfx alvarowolfx marked this pull request as ready for review August 29, 2023 21:41
@alvarowolfx alvarowolfx requested review from a team as code owners August 29, 2023 21:41
@zeroshade
Copy link

@alvarowolfx I'm gonna try to get a thorough review of this in the next day or two from the Arrow perspective. Thanks for doing this!

Comment on lines 457 to 459
r, err := ipc.NewReader(&arrowIteratorReader{
it: arrowIt,
})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have an interface that doesn't require consumers to implement their own arrowIteratorReader?

Comment on lines 204 to 206
wg := sync.WaitGroup{}
wg.Add(len(streams))
sem := semaphore.NewWeighted(int64(it.session.settings.maxWorkerCount))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be feasible to expose the streams themselves to a consumer to allow them to control the parallelization instead of forcing them to a specific route like this?

@zeroshade
Copy link

The new ArrowIteratorReader looks good to me, so this seems pretty good from my perspective, though it would be nice to be able to expose the raw streams to the consumer directly allowing them to control parallelization and partition downloading possibly. But that's certainly something for a future enhancement I think.

@product-auto-label product-auto-label bot added the stale: old Pull request is old and needs attention. label Sep 29, 2023
if err == io.EOF {
batch, err := r.it.Next()
if err == iterator.Done {
return -1, io.EOF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading of the io.Reader interface would suggest we should return 0 rather than a negative value.

Copy link
Contributor Author

@alvarowolfx alvarowolfx Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, just sent a fix

numrec := 0
for r.Next() {
rec := r.Record()
rec.Retain()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the release/retains here, but I've not been spending much time with arrow recently. If you retain individual records do you need to release them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are going to be releases by the ipc.Reader later on the r.Release() call right after.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not exactly. If you call Retain on an individual record, then you will need to call Release on that record.

The ipc.Reader keeps only the current Record, reusing that member. When you call Next() it will release the record it had before loading the next one. This is why you need to call Retain on the records that you put into the slice, so that they aren't deallocated by the ipc.Reader calling Release on them. However you should also add a defer rec.Release() in the loop to ensure that record gets released, the ipc.Reader will not retain any references to those records and therefore will not call Release on them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, I didn't know that. I'll make the changes to call rec.Release on each record.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to verify that everything is released/retained correctly, you could use memory.CheckedAllocator and defer mem.AssertSize(t, 0) then pass the checked allocator to everything (like to ipc.NewReader) so that it is used for all the memory allocations.

Not absolutely necessary, but an optional way to add some assurances if desired.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna add support for changing the allocator just internally for now for test purposes, I liked the idea of verifying that there are no memory leaks. Thanks for the tip.

var totalFromArrow int64
for tr.Next() {
rec := tr.Record()
vec := array.NewInt64Data(rec.Column(1).Data())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused here, why array.NewInt64Data(rec.Column(1).Data())? Why not just rec.Column(1).(*array.Int64)?

You're performing an additional allocation here when you don't need to be, and it's also a potential memory leak since this new array instance will not get released when the record is released.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know that the rec.Column here could be type converted to *array.Int64 directly. Is always hard to know when you can do those type conversions directly, so I was trying to use the API methods to do that instead. I'll make the change here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, rec.Column returns an arrow.Array interface value, so as long as you know the column is an int64 column, then you can type assert it to *array.Int64. Alternately you can do a type switch / check the data type, etc. before doing the type assertion. The intent is to minimize copying and minimize allocations when handling arrays and utilizing them for records etc.

}
totalFromSQL := sumValues[0].(int64)

tr := array.NewTableReader(arrowTable, arrowTable.NumRows())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add defer tr.Release() please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after adding support for changing the allocator I found that without the tr.Release a memory leak was happening. Good catch and awesome tips on how to catch those leaks 🎉

arrowSchema *arrow.Schema
}

func newArrowDecoder(arrowSerializedSchema []byte, schema Schema) (*arrowDecoder, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would probably be worthwhile (though certainly could be done as a follow-up) to allow passing a memory.Allocator interface here that would be stored in the arrowDecoder to allow a user to configure how memory gets allocated for the arrow batches (it would be passed as ipc.WithAllocator(mem) to ipc.NewReader)

In most cases users would probalby just use memory.DefaultAllocator but in other cases, depending on the constraints of the system, they might want to use a custom allocator such as a malloc based allocator that uses C memory to avoid garbage collection passes, or any other custom allocation they might want for specialized situations.

The other benefit of this would be that you could use memory.CheckedAllocator in unit tests to verify that everything properly has Release called if necessary, etc.

buf.Write(serializedArrowRecordBatch)
return ipc.NewReader(buf, ipc.WithSchema(ap.arrowSchema))
func (ap *arrowDecoder) createIPCReaderForBatch(arrowRecordBatch *ArrowRecordBatch) (*ipc.Reader, error) {
return ipc.NewReader(arrowRecordBatch, ipc.WithSchema(ap.arrowSchema))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, it would be great (but could be a follow-up) if either newArrowDecoder accepted a memory.Allocator which would get used here as ipc.WithAllocator(mem) or if this method optionally took an allocator (defaulting to memory.DefaultAllocator if nil was passed)

Copy link

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a couple comments, but overall this looks good to me, though I would like to point at #8506 (comment) for possible consideration.

ADBC allows for a query to return a result set via partition identifiers (arbitrary byte blobs for a given driver) and then allow the consumer to retrieve the streams of data from each partition in parallel. Since BigQuery already can return the data as multiple streams, there could be a benefit to allowing this Arrow iterator to return the raw underlying streams of IPC data instead of only exposing an interface for the collected streams with the parallelization handled in here, rather than by the consumer.

It's not something that I believe should block this PR, but should possibly be picked up in a follow-up as an enhancement.

@alvarowolfx
Copy link
Contributor Author

I'll create another issue to keep track of this request as a future enhancement. Thanks for the deeper review on the PR, I'm gonna push some improvements based on the comments.

Added a couple comments, but overall this looks good to me, though I would like to point at #8506 (comment) for possible consideration.

@alvarowolfx alvarowolfx added the automerge Merge the pull request once unit tests and other checks pass. label Oct 23, 2023
@gcf-merge-on-green gcf-merge-on-green bot merged commit c8e7692 into googleapis:main Oct 23, 2023
8 checks passed
@gcf-merge-on-green gcf-merge-on-green bot removed the automerge Merge the pull request once unit tests and other checks pass. label Oct 23, 2023
gcf-merge-on-green bot pushed a commit that referenced this pull request Oct 30, 2023
🤖 I have created a release *beep* *boop*
---


## [1.57.0](https://github.com/googleapis/google-cloud-go/compare/bigquery/v1.56.0...bigquery/v1.57.0) (2023-10-30)


### Features

* **bigquery/biglake:** Promote to GA ([e864fbc](https://github.com/googleapis/google-cloud-go/commit/e864fbcbc4f0a49dfdb04850b07451074c57edc8))
* **bigquery/storage/managedwriter:** Support default value controls ([#8686](https://github.com/googleapis/google-cloud-go/issues/8686)) ([dfa8e22](https://github.com/googleapis/google-cloud-go/commit/dfa8e22edf560211ae2a2ebf1f9a23b86887c7be))
* **bigquery:** Expose Apache Arrow data through ArrowIterator  ([#8506](https://github.com/googleapis/google-cloud-go/issues/8506)) ([c8e7692](https://github.com/googleapis/google-cloud-go/commit/c8e76923621b379fb7deb6dfb944011af1d980bd)), refs [#8100](https://github.com/googleapis/google-cloud-go/issues/8100)
* **bigquery:** Introduce query preview features ([#8653](https://github.com/googleapis/google-cloud-go/issues/8653)) ([f29683b](https://github.com/googleapis/google-cloud-go/commit/f29683bcd06567e4fc2d404f53bedbea5b5f0f90))


### Bug Fixes

* **bigquery:** Handle storage read api Recv call errors ([#8666](https://github.com/googleapis/google-cloud-go/issues/8666)) ([c73963f](https://github.com/googleapis/google-cloud-go/commit/c73963f64ef667daa8a33a5a4cc2156818fc6914))
* **bigquery:** Update golang.org/x/net to v0.17.0 ([174da47](https://github.com/googleapis/google-cloud-go/commit/174da47254fefb12921bbfc65b7829a453af6f5d))
* **bigquery:** Update grpc-go to v1.56.3 ([343cea8](https://github.com/googleapis/google-cloud-go/commit/343cea8c43b1e31ae21ad50ad31d3b0b60143f8c))
* **bigquery:** Update grpc-go to v1.59.0 ([81a97b0](https://github.com/googleapis/google-cloud-go/commit/81a97b06cb28b25432e4ece595c55a9857e960b7))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
bhshkh pushed a commit that referenced this pull request Nov 3, 2023
As we have some planned work to support Arrow data fetching on other query APIs, so we need to think of an interface that will support all of those query paths and also work as a base for other Arrow projects like ADBC. So this PR detaches the Storage API from the Arrow Decoder and creates a new ArrowIterator interface. This new interface is implemented by the Storage iterator and later can be implemented for other query interfaces that supports Arrow.

Resolves #8100
bhshkh pushed a commit that referenced this pull request Nov 3, 2023
🤖 I have created a release *beep* *boop*
---


## [1.57.0](https://github.com/googleapis/google-cloud-go/compare/bigquery/v1.56.0...bigquery/v1.57.0) (2023-10-30)


### Features

* **bigquery/biglake:** Promote to GA ([e864fbc](https://github.com/googleapis/google-cloud-go/commit/e864fbcbc4f0a49dfdb04850b07451074c57edc8))
* **bigquery/storage/managedwriter:** Support default value controls ([#8686](https://github.com/googleapis/google-cloud-go/issues/8686)) ([dfa8e22](https://github.com/googleapis/google-cloud-go/commit/dfa8e22edf560211ae2a2ebf1f9a23b86887c7be))
* **bigquery:** Expose Apache Arrow data through ArrowIterator  ([#8506](https://github.com/googleapis/google-cloud-go/issues/8506)) ([c8e7692](https://github.com/googleapis/google-cloud-go/commit/c8e76923621b379fb7deb6dfb944011af1d980bd)), refs [#8100](https://github.com/googleapis/google-cloud-go/issues/8100)
* **bigquery:** Introduce query preview features ([#8653](https://github.com/googleapis/google-cloud-go/issues/8653)) ([f29683b](https://github.com/googleapis/google-cloud-go/commit/f29683bcd06567e4fc2d404f53bedbea5b5f0f90))


### Bug Fixes

* **bigquery:** Handle storage read api Recv call errors ([#8666](https://github.com/googleapis/google-cloud-go/issues/8666)) ([c73963f](https://github.com/googleapis/google-cloud-go/commit/c73963f64ef667daa8a33a5a4cc2156818fc6914))
* **bigquery:** Update golang.org/x/net to v0.17.0 ([174da47](https://github.com/googleapis/google-cloud-go/commit/174da47254fefb12921bbfc65b7829a453af6f5d))
* **bigquery:** Update grpc-go to v1.56.3 ([343cea8](https://github.com/googleapis/google-cloud-go/commit/343cea8c43b1e31ae21ad50ad31d3b0b60143f8c))
* **bigquery:** Update grpc-go to v1.59.0 ([81a97b0](https://github.com/googleapis/google-cloud-go/commit/81a97b06cb28b25432e4ece595c55a9857e960b7))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
@Yifeng-Sigma
Copy link

I'm wondering if we have plans to make arrowDecoder public, so we can obtain arrow.Record. Currently seems there's no way to get arrow.Record.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the BigQuery API. size: l Pull request size is large. stale: old Pull request is old and needs attention.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

bigquery: Expose Apache Arrow data
4 participants