Skip to content

Commit

Permalink
[Debezium] Better support around JSON arrays (#1072)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 12, 2024
1 parent c399574 commit cdd524f
Showing 4 changed files with 56 additions and 3 deletions.
33 changes: 31 additions & 2 deletions lib/debezium/converters/basic.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package converters

import (
"encoding/base64"
"encoding/json"
"fmt"

"github.com/artie-labs/transfer/lib/config/constants"
@@ -88,16 +89,44 @@ func (Float64) Convert(value any) (any, error) {
}
}

type Array struct{}
func NewArray(json bool) Array {
return Array{json: json}
}

type Array struct {
json bool
}

func (Array) ToKindDetails() typing.KindDetails {
return typing.Array
}

func (Array) Convert(value any) (any, error) {
func (a Array) Convert(value any) (any, error) {
if fmt.Sprint(value) == fmt.Sprintf("[%s]", constants.ToastUnavailableValuePlaceholder) {
return constants.ToastUnavailableValuePlaceholder, nil
}

if a.json {
// Debezium will give us a list of JSON strings. We will then need to convert them to JSON objects.
elements, ok := value.([]any)
if !ok {
return nil, fmt.Errorf("expected []any, got %T", value)
}

convertedElements := make([]any, len(elements))
for i, element := range elements {
if castedElement, ok := element.(string); ok {
var obj any
if err := json.Unmarshal([]byte(castedElement), &obj); err != nil {
return nil, err
}

convertedElements[i] = obj
}
}

return convertedElements, nil
}

return value, nil
}
7 changes: 7 additions & 0 deletions lib/debezium/converters/basic_test.go
Original file line number Diff line number Diff line change
@@ -110,4 +110,11 @@ func TestArray_Convert(t *testing.T) {
assert.Equal(t, constants.ToastUnavailableValuePlaceholder, value)
}
}
{
// Array of JSON objects
value, err := NewArray(true).Convert([]any{"{\"body\": \"they are on to us\", \"sender\": \"pablo\"}"})
assert.NoError(t, err)
assert.Len(t, value.([]any), 1)
assert.Equal(t, map[string]any{"body": "they are on to us", "sender": "pablo"}, value.([]any)[0])
}
}
10 changes: 9 additions & 1 deletion lib/debezium/schema.go
Original file line number Diff line number Diff line change
@@ -53,13 +53,21 @@ const (
Map FieldType = "map"
)

type Item struct {
Type FieldType `json:"type"`
Optional bool `json:"optional"`
DebeziumType SupportedDebeziumType `json:"name"`
}

type Field struct {
Type FieldType `json:"type"`
Optional bool `json:"optional"`
Default any `json:"default"`
FieldName string `json:"field"`
DebeziumType SupportedDebeziumType `json:"name"`
Parameters map[string]any `json:"parameters"`
// [ItemsMetadata] is only populated if the literal type is an array.
ItemsMetadata Item `json:"items"`
}

func (f Field) GetScaleAndPrecision() (int32, *int32, error) {
@@ -138,7 +146,7 @@ func (f Field) ToValueConverter() (converters.ValueConverter, error) {

switch f.Type {
case Array:
return converters.Array{}, nil
return converters.NewArray(f.ItemsMetadata.DebeziumType == JSON), nil
case Double, Float:
return converters.Float64{}, nil
}
9 changes: 9 additions & 0 deletions lib/debezium/types_test.go
Original file line number Diff line number Diff line change
@@ -172,6 +172,15 @@ func TestField_ParseValue(t *testing.T) {
assert.Equal(t, `[[{"foo":"bar"}],[{"hello":"world"},{"dusty":"the mini aussie"}]]`, val)
}
}
{
// Array
field := Field{Type: Array, ItemsMetadata: Item{DebeziumType: JSON}}
value, err := field.ParseValue([]any{`{"foo": "bar", "foo": "bar"}`, `{"hello": "world"}`})
assert.NoError(t, err)
assert.Len(t, value.([]any), 2)
assert.Equal(t, map[string]any{"foo": "bar"}, value.([]any)[0])
assert.Equal(t, map[string]any{"hello": "world"}, value.([]any)[1])
}
{
// Int32
value, err := Field{Type: Int32}.ParseValue(float64(3))

0 comments on commit cdd524f

Please sign in to comment.