Skip to content

Commit

Permalink
[Postgres] Aligning array values to Debezium (#591)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 18, 2024
1 parent e98ae1a commit fa51a01
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 44 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.23.0
require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/artie-labs/transfer v1.27.49
github.com/artie-labs/transfer v1.27.50
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlE
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/artie-labs/transfer v1.27.49 h1:+/Cmlsrgl51eVYxyJX1BPX/mJQqiwSkpjUhEOTa/Bgk=
github.com/artie-labs/transfer v1.27.49/go.mod h1:9KfxO41Oe3Gf5duYDv3dIwwng1/tkfUSZOqa8LQprZ4=
github.com/artie-labs/transfer v1.27.50 h1:wBX503qjGJRMg4GjOnVjeYiGwxZBuqJQmgFtqyE1SVw=
github.com/artie-labs/transfer v1.27.50/go.mod h1:9KfxO41Oe3Gf5duYDv3dIwwng1/tkfUSZOqa8LQprZ4=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
Expand Down
24 changes: 22 additions & 2 deletions integration_tests/postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ CREATE TABLE %s (
c_geometry geometry,
c_geography geography(Point),
-- Arrays
c_int_array int[]
c_int_array int[],
c_jsonb_array jsonb[]
)
`

Expand Down Expand Up @@ -270,7 +271,9 @@ INSERT INTO %s VALUES (
-- c_geography
'POINT(-118.4079 33.9434)',
-- c_int_array
'{0,2,4,6}'
'{0,2,4,6}',
-- c_jsonb_array
ARRAY['{"key1": "value1"}'::jsonb,'{"key2": "value2"}'::jsonb]
)
`

Expand Down Expand Up @@ -706,6 +709,19 @@ const expectedPayloadTemplate = `{
"field": "c_int_array",
"name": "",
"parameters": null
},
{
"type": "array",
"optional": false,
"default": null,
"field": "c_jsonb_array",
"name": "",
"parameters": null,
"items": {
"type": "",
"optional": false,
"name": "io.debezium.data.Json"
}
}
],
"optional": false,
Expand Down Expand Up @@ -761,6 +777,10 @@ const expectedPayloadTemplate = `{
"c_interval": 7200000000,
"c_json": "{\"foo\": \"bar\", \"baz\": 1234}",
"c_jsonb": "{\"baz\": 4321, \"foo\": \"bar\"}",
"c_jsonb_array": [
"{\"key1\":\"value1\"}",
"{\"key2\":\"value2\"}"
],
"c_macaddr": "12:34:56:78:90:ab",
"c_macaddr8": "12:34:56:78:90:ab:cd:ef",
"c_money": "T30t",
Expand Down
56 changes: 51 additions & 5 deletions lib/debezium/converters/array.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,70 @@
package converters

import (
"encoding/json"
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/debezium"
)

type ArrayConverter struct{}
func NewArrayConverter(elementType string) ArrayConverter {
var json bool
for _, el := range []string{"json", "jsonb"} {
if strings.EqualFold(elementType, el) {
json = true
break
}
}

return ArrayConverter{json: json}
}

type ArrayConverter struct {
json bool
}

func (a ArrayConverter) ToField(name string) debezium.Field {
var item *debezium.Item
if a.json {
item = &debezium.Item{
DebeziumType: debezium.JSON,
}
}

func (ArrayConverter) ToField(name string) debezium.Field {
return debezium.Field{
FieldName: name,
Type: debezium.Array,
FieldName: name,
Type: debezium.Array,
ItemsMetadata: item,
}
}

func (ArrayConverter) Convert(value any) (any, error) {
func (a ArrayConverter) Convert(value any) (any, error) {
arrayValue, ok := value.([]any)
if !ok {
return nil, fmt.Errorf("expected []any got %T with value: %v", value, value)
}

if a.json {
// If json is enabled, we should parse the array elements to JSON strings
var elements []any
for _, el := range arrayValue {
switch el.(type) {
case string:
// Already JSON string, so we can skip the marshalling
elements = append(elements, el)
default:
parsedValue, err := json.Marshal(el)
if err != nil {
return nil, fmt.Errorf("failed to marshal value: %v", err)
}

elements = append(elements, string(parsedValue))
}
}

return elements, nil
}

return arrayValue, nil
}
83 changes: 83 additions & 0 deletions lib/debezium/converters/array_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package converters

import (
"github.com/artie-labs/transfer/lib/debezium"
"github.com/stretchr/testify/assert"
"testing"
)

func TestArrayConverter_ToField(t *testing.T) {
{
// String
converter := NewArrayConverter("string")
field := converter.ToField("name")
assert.Equal(t, debezium.Field{
FieldName: "name",
Type: debezium.Array,
ItemsMetadata: nil,
}, field)
}
{
// json[]
converter := NewArrayConverter("json")
field := converter.ToField("name")
assert.Equal(t, debezium.Field{
FieldName: "name",
Type: debezium.Array,
ItemsMetadata: &debezium.Item{
DebeziumType: debezium.JSON,
},
}, field)
}
{
// jsonb[]
converter := NewArrayConverter("jsonb")
field := converter.ToField("name")
assert.Equal(t, debezium.Field{
FieldName: "name",
Type: debezium.Array,
ItemsMetadata: &debezium.Item{
DebeziumType: debezium.JSON,
},
}, field)
}
}

func TestArrayConverter(t *testing.T) {
{
// Array of strings
list := []any{"a", "b", "c"}
converter := NewArrayConverter("string")
converted, err := converter.Convert(list)
assert.NoError(t, err)

returnedValue, err := converter.ToField("name").ParseValue(converted)
assert.NoError(t, err)
assert.Equal(t, list, returnedValue)
}
{
// Array of jsonb[]
listOfObjects := []any{map[string]any{"a": "b"}, map[string]any{"c": "d"}, []any{"e", "f"}}
listOfJsonStrings := []any{`{"a": "b"}`, `{"c": "d"}`, `["e", "f"]`}
{
// Invalid - item type is JSON objects
converter := NewArrayConverter("jsonb")
converted, err := converter.Convert(listOfObjects)
assert.NoError(t, err)

returnedValue, err := converter.ToField("name").ParseValue(converted)
assert.NoError(t, err)
assert.Equal(t, listOfObjects, returnedValue)
}
{
// Valid - item type is JSON strings
converter := NewArrayConverter("jsonb")
converted, err := converter.Convert(listOfJsonStrings)
assert.NoError(t, err)

returnedValue, err := converter.ToField("name").ParseValue(converted)
assert.NoError(t, err)
assert.Equal(t, listOfObjects, returnedValue)
}
}
}
39 changes: 31 additions & 8 deletions lib/postgres/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,27 @@ type Opts struct {
Scale uint16
Precision int
CharMaxLength int
ElementType *string
}

type Column = column.Column[DataType, Opts]

const describeTableQuery = `
SELECT column_name, data_type, numeric_precision, numeric_scale, udt_name, character_maximum_length
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2`
SELECT
c.column_name,
c.data_type,
c.numeric_precision,
c.numeric_scale,
c.udt_name,
c.character_maximum_length,
CASE
WHEN c.data_type = 'ARRAY' THEN t.typname
ELSE NULL
END AS element_type
FROM information_schema.columns c
LEFT JOIN pg_type t ON c.udt_name = t.typname
WHERE c.table_schema = $1 AND c.table_name = $2;
`

func DescribeTable(db *sql.DB, _schema, table string) ([]Column, error) {
query := strings.TrimSpace(describeTableQuery)
Expand All @@ -76,8 +89,8 @@ func DescribeTable(db *sql.DB, _schema, table string) ([]Column, error) {
var numericScale *uint16
var udtName *string
var charMaxLength *int
err = rows.Scan(&colName, &colType, &numericPrecision, &numericScale, &udtName, &charMaxLength)
if err != nil {
var elementType *string
if err = rows.Scan(&colName, &colType, &numericPrecision, &numericScale, &udtName, &charMaxLength, &elementType); err != nil {
return nil, err
}

Expand All @@ -88,7 +101,7 @@ func DescribeTable(db *sql.DB, _schema, table string) ([]Column, error) {
continue
}

dataType, opts, err := parseColumnDataType(colType, numericPrecision, numericScale, charMaxLength, udtName)
dataType, opts, err := parseColumnDataType(colType, numericPrecision, numericScale, charMaxLength, udtName, elementType)
if err != nil {
return nil, fmt.Errorf("unable to identify type %q for column %q", colType, colName)
}
Expand All @@ -102,7 +115,7 @@ func DescribeTable(db *sql.DB, _schema, table string) ([]Column, error) {
return cols, nil
}

func parseColumnDataType(colKind string, precision *int, scale *uint16, charMaxLength *int, udtName *string) (DataType, *Opts, error) {
func parseColumnDataType(colKind string, precision *int, scale *uint16, charMaxLength *int, udtName *string, elementType *string) (DataType, *Opts, error) {
colKind = strings.ToLower(colKind)
switch colKind {
case "bit":
Expand Down Expand Up @@ -152,7 +165,17 @@ func parseColumnDataType(colKind string, precision *int, scale *uint16, charMaxL
case "uuid":
return UUID, nil, nil
case "array":
return Array, nil, nil
if elementType == nil {
return -1, nil, fmt.Errorf("missing element type for array column")
}

// Element type should have _ prefix, so we need to remove it
if !strings.HasPrefix(*elementType, "_") {
return -1, nil, fmt.Errorf("expected element type to have _ prefix: %q", *elementType)
}

*elementType = (*elementType)[1:]
return Array, &Opts{ElementType: elementType}, nil
case "json", "jsonb":
return JSON, nil, nil
case "point":
Expand Down
Loading

0 comments on commit fa51a01

Please sign in to comment.