Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(go/adbc/driver/snowflake): handle non-arrow result sets #909

Merged
merged 2 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions go/adbc/driver/snowflake/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,15 @@ func (suite *SnowflakeTests) TestStatementEmptyResultSet() {

// XXX: there IS data in this result set, but Snowflake doesn't
// appear to support getting the results as Arrow
Comment on lines 359 to 360
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outdated comment?

_, _, err := suite.stmt.ExecuteQuery(suite.ctx)
var adbcErr adbc.Error
suite.ErrorAs(err, &adbcErr)
rdr, n, err := suite.stmt.ExecuteQuery(suite.ctx)
suite.Require().NoError(err)
defer rdr.Release()

suite.Equal(adbc.StatusInternal, adbcErr.Code)
suite.Contains(adbcErr.Msg, "Cannot get Arrow data from this result set")
suite.True(rdr.Next())
rec := rdr.Record()
suite.Equal(n, rec.NumRows())
suite.EqualValues(25, rec.NumCols())

suite.False(rdr.Next())
suite.NoError(rdr.Err())
}
179 changes: 176 additions & 3 deletions go/adbc/driver/snowflake/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package snowflake

import (
"context"
"encoding/hex"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -243,6 +245,163 @@ func getTransformer(sc *arrow.Schema, ld gosnowflake.ArrowStreamLoader) (*arrow.
return out, getRecTransformer(out, transformers)
}

func rowTypesToArrowSchema(ctx context.Context, ld gosnowflake.ArrowStreamLoader) (*arrow.Schema, error) {
var loc *time.Location

metadata := ld.RowTypes()
fields := make([]arrow.Field, len(metadata))
for i, srcMeta := range metadata {
fields[i] = arrow.Field{
Name: srcMeta.Name,
Nullable: srcMeta.Nullable,
Metadata: arrow.MetadataFrom(map[string]string{
"SNOWFLAKE_TYPE": srcMeta.Type,
}),
}
switch srcMeta.Type {
case "fixed":
fields[i].Type = arrow.PrimitiveTypes.Int64
case "real":
fields[i].Type = arrow.PrimitiveTypes.Float64
case "date":
fields[i].Type = arrow.PrimitiveTypes.Date32
case "time":
fields[i].Type = arrow.FixedWidthTypes.Time64ns
case "timestamp_ntz", "timestamp_tz":
fields[i].Type = arrow.FixedWidthTypes.Timestamp_ns
case "timestamp_ltz":
if loc == nil {
loc = time.Now().Location()
}
fields[i].Type = &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: loc.String()}
case "binary":
fields[i].Type = arrow.BinaryTypes.Binary
default:
fields[i].Type = arrow.BinaryTypes.String
}
}
return arrow.NewSchema(fields, nil), nil
}

func extractTimestamp(src *string) (sec, nsec int64, err error) {
s, ms, hasFraction := strings.Cut(*src, ".")
sec, err = strconv.ParseInt(s, 10, 64)
if err != nil {
return
}

if !hasFraction {
return
}

nsec, err = strconv.ParseInt(ms+strings.Repeat("0", 9-len(ms)), 10, 64)
return
}

func jsonDataToArrow(ctx context.Context, bldr *array.RecordBuilder, ld gosnowflake.ArrowStreamLoader) (arrow.Record, error) {
rawData := ld.JSONData()
fieldBuilders := bldr.Fields()
for _, rec := range rawData {
for i, col := range rec {
field := fieldBuilders[i]

if col == nil {
field.AppendNull()
continue
}

switch fb := field.(type) {
case *array.Time64Builder:
sec, nsec, err := extractTimestamp(col)
if err != nil {
return nil, err
}

fb.Append(arrow.Time64(sec*1e9 + nsec))
case *array.TimestampBuilder:
tz, err := fb.Type().(*arrow.TimestampType).GetZone()
if err != nil {
return nil, err
}

if tz != time.UTC {
sec, nsec, err := extractTimestamp(col)
if err != nil {
return nil, err
}
val := time.Unix(sec, nsec).In(loc)
ts, err := arrow.TimestampFromTime(val, arrow.Nanosecond)
if err != nil {
return nil, err
}
fb.Append(ts)
break
}

snowflakeType, _ := bldr.Schema().Field(i).Metadata.GetValue("SNOWFLAKE_TYPE")
if snowflakeType == "timestamp_ntz" {
sec, nsec, err := extractTimestamp(col)
if err != nil {
return nil, err
}

fb.Append(arrow.Timestamp(sec*1e9 + nsec))
break
}

// "timestamp_tz" should be value + offset separated by space
tm := strings.Split(*col, " ")
if len(tm) != 2 {
return nil, adbc.Error{
Msg: "invalid TIMESTAMP_TZ data. value doesn't consist of two numeric values separated by a space: " + *col,
SqlState: [5]byte{'2', '2', '0', '0', '7'},
VendorCode: 268000,
Code: adbc.StatusInvalidData,
}
}

sec, nsec, err := extractTimestamp(&tm[0])
if err != nil {
return nil, err
}
offset, err := strconv.ParseInt(tm[1], 10, 64)
if err != nil {
return nil, adbc.Error{
Msg: "invalid TIMESTAMP_TZ data. offset value is not an integer: " + tm[1],
SqlState: [5]byte{'2', '2', '0', '0', '7'},
VendorCode: 268000,
Code: adbc.StatusInvalidData,
}
}

loc := gosnowflake.Location(int(offset) - 1440)
tt := time.Unix(sec, nsec).In(loc)
ts, err := arrow.TimestampFromTime(tt, arrow.Nanosecond)
if err != nil {
return nil, err
}
fb.Append(ts)
case *array.BinaryBuilder:
b, err := hex.DecodeString(*col)
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
VendorCode: 268002,
SqlState: [5]byte{'2', '2', '0', '0', '3'},
Code: adbc.StatusInvalidData,
}
}
fb.Append(b)
default:
if err := fb.AppendValueFromString(*col); err != nil {
return nil, err
}
}
}
}
return bldr.NewRecord(), nil
}

type reader struct {
refCount int64
schema *arrow.Schema
Expand All @@ -263,10 +422,24 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
if len(batches) == 0 {
if ld.TotalRows() != 0 {
// XXX(https://github.com/apache/arrow-adbc/issues/863): Snowflake won't return Arrow data for certain queries
return nil, adbc.Error{
Msg: "[Snowflake] Cannot get Arrow data from this result set (see apache/arrow-adbc#863)",
Code: adbc.StatusInternal,
schema, err := rowTypesToArrowSchema(ctx, ld)
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
Code: adbc.StatusInternal,
}
}

bldr := array.NewRecordBuilder(alloc, schema)
defer bldr.Release()

rec, err := jsonDataToArrow(ctx, bldr, ld)
if err != nil {
return nil, err
}
defer rec.Release()

return array.NewRecordReader(schema, []arrow.Record{rec})
}
schema := arrow.NewSchema([]arrow.Field{}, nil)
reader, err := array.NewRecordReader(schema, []arrow.Record{})
Expand Down
56 changes: 29 additions & 27 deletions go/adbc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,39 @@ require (
github.com/apache/arrow/go/v13 v13.0.0-20230713180941-b97597765355
github.com/bluele/gcache v0.0.2
github.com/google/uuid v1.3.0
github.com/snowflakedb/gosnowflake v1.6.21
github.com/snowflakedb/gosnowflake v1.6.22
github.com/stretchr/testify v1.8.2
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
golang.org/x/sync v0.3.0
golang.org/x/tools v0.10.0
golang.org/x/tools v0.11.0
google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.30.0
)

require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v12 v12.0.0 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.18.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.24 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.67 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.25 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.72 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.2 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.33.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.30 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.4 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.37.0 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
Expand All @@ -70,24 +70,24 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/term v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand All @@ -102,3 +102,5 @@ require (
modernc.org/strutil v1.1.3 // indirect
modernc.org/token v1.1.0 // indirect
)

replace github.com/snowflakedb/gosnowflake => github.com/snowflakedb/gosnowflake v1.6.23-0.20230717195239-fec38ba82d2a
Loading