Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery connector changes #3001

Merged
merged 19 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/NYTimes/gziphandler v1.1.1
github.com/XSAM/otelsql v0.23.0
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/apache/arrow/go/v11 v11.0.0
github.com/apache/arrow/go/v13 v13.0.0
github.com/apache/calcite-avatica-go/v5 v5.2.0
github.com/aws/aws-sdk-go v1.44.268
github.com/benbjohnson/clock v1.3.5
Expand Down Expand Up @@ -75,9 +75,9 @@ require (
golang.org/x/oauth2 v0.8.0
golang.org/x/sync v0.2.0
golang.org/x/sys v0.9.0
google.golang.org/api v0.126.0
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
google.golang.org/api v0.128.0
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.31.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/square/go-jose.v2 v2.6.0
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -89,7 +89,7 @@ require (
require google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect

require (
cloud.google.com/go v0.110.2
cloud.google.com/go v0.110.2 // indirect
cloud.google.com/go/bigquery v1.52.0
cloud.google.com/go/compute v1.19.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
Expand All @@ -103,7 +103,6 @@ require (
github.com/acomagu/bufpipe v1.0.4 // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v12 v12.0.0 // indirect
github.com/apache/thrift v0.18.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.18.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
Expand Down Expand Up @@ -161,8 +160,8 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/wire v0.5.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/icholy/digest v0.1.22 // indirect
Expand Down Expand Up @@ -260,4 +259,6 @@ require (

replace github.com/apache/calcite-avatica-go/v5 v5.2.0 => github.com/rilldata/calcite-avatica-go/v5 v5.0.0-20230621112535-eea498ff2a3d

replace cloud.google.com/go/bigquery v1.52.0 => github.com/k-anshul/google-cloud-go/bigquery v0.0.0-20230830110740-57fe623cde03
k-anshul marked this conversation as resolved.
Show resolved Hide resolved

exclude modernc.org/sqlite v1.18.1
32 changes: 16 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM7
cloud.google.com/go/bigquery v1.42.0/go.mod h1:8dRTJxhtG+vwBKzE5OseQn/hiydoQN3EedCaOdYmxRA=
cloud.google.com/go/bigquery v1.43.0/go.mod h1:ZMQcXHsl+xmU1z36G2jNGZmKp9zNY5BUua5wDgmNCfw=
cloud.google.com/go/bigquery v1.44.0/go.mod h1:0Y33VqXTEsbamHJvJHdFmtqHvMIY28aK1+dFsvaChGc=
cloud.google.com/go/bigquery v1.52.0 h1:JKLNdxI0N+TIUWD6t9KN646X27N5dQWq9dZbbTWZ8hc=
cloud.google.com/go/bigquery v1.52.0/go.mod h1:3b/iXjRQGU4nKa87cXeg6/gogLjO8C6PmuM8i5Bi/u4=
cloud.google.com/go/billing v1.4.0/go.mod h1:g9IdKBEFlItS8bTtlrZdVLWSSdSyFUZKXNS02zKMOZY=
cloud.google.com/go/billing v1.5.0/go.mod h1:mztb1tBc3QekhjSgmpf/CV4LzWXLzCArwpLmP2Gm88s=
cloud.google.com/go/billing v1.6.0/go.mod h1:WoXzguj+BeHXPbKfNWkqVtDdzORazmCjraY+vrxcyvI=
Expand Down Expand Up @@ -565,10 +563,8 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/arrow/go/v11 v11.0.0 h1:hqauxvFQxww+0mEU/2XHG6LT7eZternCZq+A5Yly2uM=
github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI=
github.com/apache/arrow/go/v12 v12.0.0 h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2OK5cmc=
github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg=
github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk=
github.com/apache/arrow/go/v13 v13.0.0/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/apache/thrift v0.18.1 h1:lNhK/1nqjbwbiOPDBPFJVKxgDEGSepKuTh6OLiXW8kg=
github.com/apache/thrift v0.18.1/go.mod h1:rdQn/dCcDKEWjjylUeueum4vQEjG2v8v2PqriUnbr+I=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
Expand Down Expand Up @@ -741,6 +737,7 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
Expand Down Expand Up @@ -1299,8 +1296,9 @@ github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99
github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8=
github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/enterprise-certificate-proxy v0.2.3 h1:yk9/cqRKtT9wXZSsRH9aurXEpJX+U6FLtpYTdC3R06k=
github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/enterprise-certificate-proxy v0.2.4 h1:uGy6JWR/uMIILU8wbf+OkstIrNiMjGpEIyhx8f6W7s4=
github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0=
Expand All @@ -1311,8 +1309,8 @@ github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK
github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo=
github.com/googleapis/gax-go/v2 v2.6.0/go.mod h1:1mjbznJAPHFpesgE5ucqfYEscaz5kMdcIDwU/6+DDoY=
github.com/googleapis/gax-go/v2 v2.7.0/go.mod h1:TEop28CZZQ2y+c0VxMUmu1lV+fQx57QpBWsYpwqHJx8=
github.com/googleapis/gax-go/v2 v2.11.0 h1:9V9PWXEsWnPpQhu/PeQIkS4eGzMlTLGgt80cUUI8Ki4=
github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI=
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/googleapis/gnostic v0.5.1/go.mod h1:6U4PtQXGIEt/Z3h5MAT7FNofLnw9vXk2cUuW7uA/OeU=
github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97DwqyJO1AENw9kA=
Expand Down Expand Up @@ -1561,6 +1559,8 @@ github.com/juju/qthttptest v0.1.1/go.mod h1:aTlAv8TYaflIiTDIQYzxnl1QdPjAg8Q8qJME
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k-anshul/google-cloud-go/bigquery v0.0.0-20230830110740-57fe623cde03 h1:Bx9nbiPCx375f58XQiLf4Rhc4UdxSd93PZRODmc0HvY=
github.com/k-anshul/google-cloud-go/bigquery v0.0.0-20230830110740-57fe623cde03/go.mod h1:sJyijXv7F7/CRsPuWjId7YvBMzDi2H8qokBxqstIY+U=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23 h1:M8exrBzuhWcU6aoHJlHWPe4qFjVKzkMGRal78f5jRRU=
Expand Down Expand Up @@ -2863,7 +2863,7 @@ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3j
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
google.golang.org/api v0.0.0-20160322025152-9bf6e6e569ff/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
Expand Down Expand Up @@ -2927,8 +2927,8 @@ google.golang.org/api v0.106.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/
google.golang.org/api v0.107.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/O9MY=
google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/O9MY=
google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI=
google.golang.org/api v0.126.0 h1:q4GJq+cAdMAC7XP7njvQ4tvohGLiSlytuL4BQxbIZ+o=
google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw=
google.golang.org/api v0.128.0 h1:RjPESny5CnQRn9V6siglged+DZCgfu9l6mO9dkX9VOg=
google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down Expand Up @@ -3124,8 +3124,8 @@ google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCD
google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
google.golang.org/grpc v1.52.1/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY=
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/grpc v1.56.1 h1:z0dNfjIl0VpaZ9iSVjA6daGatAYwPGstTjt5vkRMFkQ=
google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand All @@ -3142,8 +3142,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv4xxEJBL5iG2DDW5aeJwzDiq4A=
Expand Down
155 changes: 155 additions & 0 deletions runtime/drivers/bigquery/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package bigquery

import (
"bytes"
"context"
"errors"
"sync/atomic"
"time"

"cloud.google.com/go/bigquery"
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/ipc"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/rilldata/rill/runtime/pkg/observability"
"go.uber.org/zap"
"google.golang.org/api/iterator"
)

func (f *fileIterator) AsArrowRecordReader() (array.RecordReader, error) {
arrowIt, err := f.bqIter.ArrowIterator()
if err != nil {
return nil, err
}

allocator := memory.DefaultAllocator
buf := bytes.NewBuffer(arrowIt.SerializedArrowSchema())
rdr, err := ipc.NewReader(buf, ipc.WithAllocator(allocator))
if err != nil {
return nil, err
}
defer rdr.Release()

rec := &arrowRecordReader{
bqIter: arrowIt,
arrowSchema: rdr.Schema(),
refCount: 1,
allocator: allocator,
logger: f.logger,
records: make([]arrow.Record, 0),
ctx: f.ctx,
}
return rec, nil
}

// some impl details are copied from array.simpleRecords
type arrowRecordReader struct {
bqIter bigquery.ArrowIterator
records []arrow.Record
cur arrow.Record
arrowSchema *arrow.Schema
refCount int64
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
err error
logger *zap.Logger
allocator memory.Allocator

apinext time.Duration
ipcread time.Duration

ctx context.Context
}

// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (rs *arrowRecordReader) Retain() {
atomic.AddInt64(&rs.refCount, 1)
}

// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (rs *arrowRecordReader) Release() {
if atomic.LoadInt64(&rs.refCount) <= 0 {
return
}

if atomic.AddInt64(&rs.refCount, -1) == 0 {
if rs.cur != nil {
rs.cur.Release()
}
for _, rec := range rs.records {
rec.Release()
}
rs.records = nil
}
rs.logger.Info("next api call took", zap.Float64("seconds", rs.apinext.Seconds()), observability.ZapCtx(rs.ctx))
rs.logger.Info("next ipc read took", zap.Float64("seconds", rs.ipcread.Seconds()), observability.ZapCtx(rs.ctx))
}

// Schema returns the underlying arrow schema
func (rs *arrowRecordReader) Schema() *arrow.Schema {
return rs.arrowSchema
}

// Record returns the current record. Call Next before consuming another record.
func (rs *arrowRecordReader) Record() arrow.Record {
return rs.cur
}

// Next returns true if another record can be produced
func (rs *arrowRecordReader) Next() bool {
if rs.err != nil {
return false
}

if len(rs.records) == 0 {
tz := time.Now()
next, err := rs.bqIter.Next()
if err != nil {
rs.err = err
return false
}
rs.apinext += time.Since(tz)

rs.records, rs.err = rs.nextArrowRecords(next)
if rs.err != nil {
return false
}
}
if rs.cur != nil {
rs.cur.Release()
}
rs.cur = rs.records[0]
rs.records = rs.records[1:]
return true
}

func (rs *arrowRecordReader) Err() error {
if errors.Is(rs.err, iterator.Done) {
return nil
}
return rs.err
}

func (rs *arrowRecordReader) nextArrowRecords(r *bigquery.ArrowRecordBatch) ([]arrow.Record, error) {
t := time.Now()
defer func() {
rs.ipcread += time.Since(t)
}()

buf := bytes.NewBuffer(rs.bqIter.SerializedArrowSchema())
buf.Write(r.Data)
rdr, err := ipc.NewReader(buf, ipc.WithSchema(rs.arrowSchema), ipc.WithAllocator(rs.allocator))
if err != nil {
return nil, err
}
defer rdr.Release()
records := make([]arrow.Record, 0)
for rdr.Next() {
rec := rdr.Record()
rec.Retain()
records = append(records, rec)
}
return records, rdr.Err()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the extra layer of buffering in rs.records needed versus directly buffering rdr and proxying to it in Next()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried proxying rdr to next but overall it feels lot more complicated than current implementation. There are multiple cases to consider. rdr will be nil in first next call, rdr can return next as false if is are no more data or if there is some genuine error(which needs to be handled separately as well). Overall it felt lot easier to just look at the size of the array.
Also as of now ArrowRecordBatch from bigquery will only return one arrow record but it's better to keep array for any change in underlying impls in future.

}
Loading