Skip to content

Commit

Permalink
SNOW-1460203 Inserting of structured objects
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-pfus committed Jun 10, 2024
1 parent 4b6794f commit 484a24b
Show file tree
Hide file tree
Showing 14 changed files with 1,402 additions and 165 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ on:
pull_request:
branches:
- master
- SNOW-*
schedule:
- cron: '7 3 * * *'
workflow_dispatch:
Expand Down
12 changes: 12 additions & 0 deletions assert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func assertStringContainsF(t *testing.T, actual string, expectedToContain string
fatalOnNonEmpty(t, validateStringContains(actual, expectedToContain, descriptions...))
}

func assertEmptyStringE(t *testing.T, actual string, descriptions ...string) {
errorOnNonEmpty(t, validateEmptyString(actual, descriptions...))
}

func assertHasPrefixE(t *testing.T, actual string, expectedPrefix string, descriptions ...string) {
errorOnNonEmpty(t, validateHasPrefix(actual, expectedPrefix, descriptions...))
}
Expand Down Expand Up @@ -176,6 +180,14 @@ func validateStringContains(actual string, expectedToContain string, description
return fmt.Sprintf("expected \"%s\" to contain \"%s\" but did not. %s", actual, expectedToContain, desc)
}

func validateEmptyString(actual string, descriptions ...string) string {
if actual == "" {
return ""
}
desc := joinDescriptions(descriptions...)
return fmt.Sprintf("expected \"%s\" to be empty, but was not. %s", actual, desc)
}

func validateHasPrefix(actual string, expectedPrefix string, descriptions ...string) string {
if strings.HasPrefix(actual, expectedPrefix) {
return ""
Expand Down
30 changes: 25 additions & 5 deletions bind_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ type bindUploader struct {
arrayBindStage string
}

type bindingSchema struct {
Typ string `json:"type"`
Nullable bool `json:"nullable"`
Fields []fieldMetadata `json:"fields"`
}

func (bu *bindUploader) upload(bindings []driver.NamedValue) (*execResponse, error) {
bindingRows, err := bu.buildRowsAsBytes(bindings)
if err != nil {
Expand Down Expand Up @@ -204,7 +210,7 @@ func (sc *snowflakeConn) processBindings(
req.BindStage = uploader.stagePath
} else {
var err error
req.Bindings, err = getBindValues(bindings)
req.Bindings, err = getBindValues(bindings, sc.cfg.Params)
if err != nil {
return err
}
Expand All @@ -213,7 +219,7 @@ func (sc *snowflakeConn) processBindings(
return nil
}

func getBindValues(bindings []driver.NamedValue) (map[string]execBindParameter, error) {
func getBindValues(bindings []driver.NamedValue, params map[string]*string) (map[string]execBindParameter, error) {
tsmode := timestampNtzType
idx := 1
var err error
Expand All @@ -231,21 +237,27 @@ func getBindValues(bindings []driver.NamedValue) (map[string]execBindParameter,
}
} else {
var val interface{}
var schema *bindingSchema
fmt := ""
if t == sliceType {
// retrieve array binding data
t, val = snowflakeArrayToString(&binding, false)
} else {
val, err = valueToString(binding.Value, tsmode)
val, fmt, schema, err = valueToString(binding.Value, tsmode, params)
if err != nil {
return nil, err
}
}
if t == nullType || t == unSupportedType {
t = textType // if null or not supported, pass to GS as text
} else if t == nullObjectType {
t = objectType
}
bindValues[bindingName(binding, idx)] = execBindParameter{
Type: t.String(),
Value: val,
Type: t.String(),
Value: val,
Format: fmt,
Schema: schema,
}
idx++
}
Expand Down Expand Up @@ -322,3 +334,11 @@ func supportedNullBind(nv *driver.NamedValue) bool {
}
return false
}

func supportedStructuredObjectWriterBind(nv *driver.NamedValue) bool {
if _, ok := nv.Value.(StructuredObjectWriter); ok {
return true
}
_, ok := nv.Value.(reflect.Type)
return ok
}
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (sc *snowflakeConn) Ping(ctx context.Context) error {
// CheckNamedValue determines which types are handled by this driver aside from
// the instances captured by driver.Value
func (sc *snowflakeConn) CheckNamedValue(nv *driver.NamedValue) error {
if supportedNullBind(nv) || supportedArrayBind(nv) {
if supportedNullBind(nv) || supportedArrayBind(nv) || supportedStructuredObjectWriterBind(nv) {
return nil
}
return driver.ErrSkip
Expand Down
91 changes: 62 additions & 29 deletions converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func goTypeToSnowflake(v driver.Value, tsmode snowflakeType) snowflakeType {
}
if supportedArrayBind(&driver.NamedValue{Value: v}) {
return sliceType
} else if _, ok := v.(StructuredObjectWriter); ok {
return objectType
} else if _, ok := v.(reflect.Type); ok && tsmode == nullObjectType {
return nullObjectType
}
return unSupportedType
}
Expand Down Expand Up @@ -206,99 +210,129 @@ func snowflakeTypeToGoForMaps[K comparable](ctx context.Context, valueMetadata f

// valueToString converts arbitrary golang type to a string. This is mainly used in binding data with placeholders
// in queries.
func valueToString(v driver.Value, tsmode snowflakeType) (*string, error) {
func valueToString(v driver.Value, tsmode snowflakeType, params map[string]*string) (*string, string, *bindingSchema, error) {
logger.Debugf("TYPE: %v, %v", reflect.TypeOf(v), reflect.ValueOf(v))
if v == nil {
return nil, nil
return nil, "", nil, nil
}
v1 := reflect.ValueOf(v)
switch v1.Kind() {
case reflect.Bool:
s := strconv.FormatBool(v1.Bool())
return &s, nil
return &s, "", nil, nil
case reflect.Int64:
s := strconv.FormatInt(v1.Int(), 10)
return &s, nil
return &s, "", nil, nil
case reflect.Float64:
s := strconv.FormatFloat(v1.Float(), 'g', -1, 32)
return &s, nil
return &s, "", nil, nil
case reflect.String:
s := v1.String()
return &s, nil
return &s, "", nil, nil
case reflect.Slice, reflect.Map:
if v1.IsNil() {
return nil, nil
return nil, "", nil, nil
}
if bd, ok := v.([]byte); ok {
if tsmode == binaryType {
s := hex.EncodeToString(bd)
return &s, nil
return &s, "", nil, nil
}
}
// TODO: is this good enough?
s := v1.String()
return &s, nil
return &s, "", nil, nil
case reflect.Struct:
switch typedVal := v.(type) {
case time.Time:
return timeTypeValueToString(typedVal, tsmode)
case sql.NullTime:
if !typedVal.Valid {
return nil, nil
return nil, "", nil, nil
}
return timeTypeValueToString(typedVal.Time, tsmode)
case sql.NullBool:
if !typedVal.Valid {
return nil, nil
return nil, "", nil, nil
}
s := strconv.FormatBool(typedVal.Bool)
return &s, nil
return &s, "", nil, nil
case sql.NullInt64:
if !typedVal.Valid {
return nil, nil
return nil, "", nil, nil
}
s := strconv.FormatInt(typedVal.Int64, 10)
return &s, nil
return &s, "", nil, nil
case sql.NullFloat64:
if !typedVal.Valid {
return nil, nil
return nil, "", nil, nil
}
s := strconv.FormatFloat(typedVal.Float64, 'g', -1, 32)
return &s, nil
return &s, "", nil, nil
case sql.NullString:
if !typedVal.Valid {
return nil, nil
return nil, "", nil, nil
}
return &typedVal.String, nil
return &typedVal.String, "", nil, nil
}
}
if sow, ok := v.(StructuredObjectWriter); ok {
sowc := &structuredObjectWriterContext{}
sowc.init(params)
err := sow.Write(sowc)
if err != nil {
return nil, "", nil, err
}
jsonBytes, err := json.Marshal(sowc.values)
if err != nil {
return nil, "", nil, err
}
jsonString := string(jsonBytes)
schema := bindingSchema{
Typ: "object",
Nullable: true,
Fields: sowc.toFields(),
}
return &jsonString, "json", &schema, nil
} else if typ, ok := v.(reflect.Type); ok {
sowc, err := buildSowcFromType(params, typ)
if err != nil {
return nil, "", nil, err
}
schema := bindingSchema{
Typ: "object",
Nullable: true,
Fields: sowc.toFields(),
}
return nil, "json", &schema, nil
}
return nil, fmt.Errorf("unsupported type: %v", v1.Kind())
return nil, "", nil, fmt.Errorf("unsupported type: %v", v1.Kind())
}

func timeTypeValueToString(tm time.Time, tsmode snowflakeType) (*string, error) {
func timeTypeValueToString(tm time.Time, tsmode snowflakeType) (*string, string, *bindingSchema, error) {
switch tsmode {
case dateType:
_, offset := tm.Zone()
tm = tm.Add(time.Second * time.Duration(offset))
s := strconv.FormatInt(tm.Unix()*1000, 10)
return &s, nil
return &s, "", nil, nil
case timeType:
s := fmt.Sprintf("%d",
(tm.Hour()*3600+tm.Minute()*60+tm.Second())*1e9+tm.Nanosecond())
return &s, nil
return &s, "", nil, nil
case timestampNtzType, timestampLtzType:
unixTime, _ := new(big.Int).SetString(fmt.Sprintf("%d", tm.Unix()), 10)
m, _ := new(big.Int).SetString(strconv.FormatInt(1e9, 10), 10)
unixTime.Mul(unixTime, m)
tmNanos, _ := new(big.Int).SetString(fmt.Sprintf("%d", tm.Nanosecond()), 10)
s := unixTime.Add(unixTime, tmNanos).String()
return &s, nil
return &s, "", nil, nil
case timestampTzType:
_, offset := tm.Zone()
s := fmt.Sprintf("%v %v", tm.UnixNano(), offset/60+1440)
return &s, nil
return &s, "", nil, nil
}
return nil, fmt.Errorf("unsupported time type: %v", tsmode)
return nil, "", nil, fmt.Errorf("unsupported time type: %v", tsmode)
}

// extractTimestamp extracts the internal timestamp data to epoch time in seconds and milliseconds
Expand Down Expand Up @@ -564,7 +598,7 @@ func jsonToMapWithKeyType[K comparable](ctx context.Context, valueMetadata field
})
case "date", "time", "timestamp_tz", "timestamp_ltz", "timestamp_ntz":
return buildMapValues[K, sql.NullTime, time.Time](mapValuesNullableEnabled, m, func(v any) (time.Time, error) {
sfFormat, err := dateTimeFormatByType(valueMetadata.Type, params)
sfFormat, err := dateTimeOutputFormatByType(valueMetadata.Type, params)
if err != nil {
return time.Time{}, err
}
Expand All @@ -577,7 +611,7 @@ func jsonToMapWithKeyType[K comparable](ctx context.Context, valueMetadata field
if v == nil {
return sql.NullTime{Valid: false}, nil
}
sfFormat, err := dateTimeFormatByType(valueMetadata.Type, params)
sfFormat, err := dateTimeOutputFormatByType(valueMetadata.Type, params)
if err != nil {
return sql.NullTime{}, err
}
Expand Down Expand Up @@ -682,7 +716,7 @@ func buildStructuredArray(ctx context.Context, fieldMetadata fieldMetadata, srcV
})
case "time", "date", "timestamp_ltz", "timestamp_ntz", "timestamp_tz":
return copyArrayAndConvert[time.Time](srcValue, func(input any) (time.Time, error) {
sfFormat, err := dateTimeFormatByType(fieldMetadata.Type, params)
sfFormat, err := dateTimeOutputFormatByType(fieldMetadata.Type, params)
if err != nil {
return time.Time{}, err
}
Expand Down Expand Up @@ -2266,7 +2300,6 @@ func recordToSchemaSingleField(fieldMetadata fieldMetadata, f arrow.Field, withH
if f.Type.ID() == arrow.STRUCT {
var internalFields []arrow.Field
for idx, internalField := range f.Type.(*arrow.StructType).Fields() {
println(fieldMetadata.String())
internalConverted, convertedDataType := recordToSchemaSingleField(fieldMetadata.Fields[idx], internalField, withHigherPrecision, timestampOption, loc)
converted = converted || internalConverted
if internalConverted {
Expand Down
Loading

0 comments on commit 484a24b

Please sign in to comment.