Skip to content

Commit

Permalink
[Redshift] Better handling around TOAST columns (#1080)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 12, 2024
1 parent 6971259 commit 6cd5cd1
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 18 deletions.
11 changes: 7 additions & 4 deletions clients/redshift/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ func (RedshiftDialect) IsTableDoesNotExistErr(_ error) bool {
}

func (rd RedshiftDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
toastedValue := "%" + constants.ToastUnavailableValuePlaceholder + "%"
colName := sql.QuoteTableAliasColumn(tableAlias, column, rd)
if column.KindDetails == typing.Struct {
return fmt.Sprintf(`COALESCE(%s != JSON_PARSE('{"key":"%s"}'), true)`,
colName, constants.ToastUnavailableValuePlaceholder)

switch column.KindDetails {
case typing.Struct, typing.Array:
return fmt.Sprintf("COALESCE(JSON_SERIALIZE(%s) NOT LIKE '%s', TRUE)", colName, toastedValue)
default:
return fmt.Sprintf(`COALESCE(%s NOT LIKE '%s', TRUE)`, colName, toastedValue)
}
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (rd RedshiftDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, _ []string) string {
Expand Down
47 changes: 34 additions & 13 deletions clients/redshift/dialect/dialect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,35 @@ func TestRedshiftDialect_BuildDropColumnQuery(t *testing.T) {
}

func TestRedshiftDialect_BuildIsNotToastValueExpression(t *testing.T) {
assert.Equal(t,
`COALESCE(tbl."bar" != '__debezium_unavailable_value', true)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)),
)
assert.Equal(t,
`COALESCE(tbl."foo" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)),
)
{
// Typing is not specified
assert.Equal(t,
`COALESCE(tbl."foo" NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Invalid)),
)
}
{
// Struct
assert.Equal(t,
`COALESCE(JSON_SERIALIZE(tbl."foo") NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)),
)
}
{
// Array
assert.Equal(t,
`COALESCE(JSON_SERIALIZE(tbl."foo") NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Array)),
)
}
{
// String
assert.Equal(t,
`COALESCE(tbl."foo" NOT LIKE '%__debezium_unavailable_value%', TRUE)`,
RedshiftDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.String)),
)
}

}

func TestRedshiftDialect_BuildMergeInsertQuery(t *testing.T) {
Expand Down Expand Up @@ -231,7 +252,7 @@ func TestRedshiftDialect_BuildMergeQueries_SkipDelete(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`,
parts[1])
}

Expand Down Expand Up @@ -259,7 +280,7 @@ func TestRedshiftDialect_BuildMergeQueries_SoftDelete(t *testing.T) {
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT stg."id",stg."email",stg."first_name",stg."last_name",stg."created_at",stg."toast_text",stg."__artie_delete" FROM public.tableName__temp AS stg LEFT JOIN public.tableName AS tgt ON tgt."id" = stg."id" WHERE tgt."id" IS NULL;`,
parts[0])
assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_only_set_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_only_set_delete", false) = false;`,
parts[1])
assert.Equal(t,
`UPDATE public.tableName AS tgt SET "__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_only_set_delete", false) = true;`,
Expand Down Expand Up @@ -289,7 +310,7 @@ func TestRedshiftDialect_BuildMergeQueries_SoftDeleteComposite(t *testing.T) {
`INSERT INTO public.tableName ("id","email","first_name","last_name","created_at","toast_text","__artie_delete") SELECT stg."id",stg."email",stg."first_name",stg."last_name",stg."created_at",stg."toast_text",stg."__artie_delete" FROM public.tableName__temp AS stg LEFT JOIN public.tableName AS tgt ON tgt."id" = stg."id" AND tgt."email" = stg."email" WHERE tgt."id" IS NULL;`,
parts[0])
assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_only_set_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END,"__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_only_set_delete", false) = false;`,
parts[1])
assert.Equal(t,
`UPDATE public.tableName AS tgt SET "__artie_delete"=stg."__artie_delete" FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_only_set_delete", false) = true;`,
Expand Down Expand Up @@ -323,7 +344,7 @@ func TestRedshiftDialect_BuildMergeQueries(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND COALESCE(stg."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
Expand Down Expand Up @@ -355,7 +376,7 @@ func TestRedshiftDialect_BuildMergeQueries_CompositeKey(t *testing.T) {
parts[0])

assert.Equal(t,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" != '__debezium_unavailable_value', true) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_delete", false) = false;`,
`UPDATE public.tableName AS tgt SET "id"=stg."id","email"=stg."email","first_name"=stg."first_name","last_name"=stg."last_name","created_at"=stg."created_at","toast_text"= CASE WHEN COALESCE(stg."toast_text" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."toast_text" ELSE tgt."toast_text" END FROM public.tableName__temp AS stg WHERE tgt."id" = stg."id" AND tgt."email" = stg."email" AND COALESCE(stg."__artie_delete", false) = false;`,
parts[1])

assert.Equal(t,
Expand Down
2 changes: 1 addition & 1 deletion lib/sql/tests/columns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestBuildColumnsUpdateFragment_Redshift(t *testing.T) {
{
name: "struct, string and toast string",
columns: lastCaseColTypes,
expectedString: `"a1"= CASE WHEN COALESCE(stg."a1" != JSON_PARSE('{"key":"__debezium_unavailable_value"}'), true) THEN stg."a1" ELSE tgt."a1" END,"b2"= CASE WHEN COALESCE(stg."b2" != '__debezium_unavailable_value', true) THEN stg."b2" ELSE tgt."b2" END,"c3"=stg."c3"`,
expectedString: `"a1"= CASE WHEN COALESCE(JSON_SERIALIZE(stg."a1") NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."a1" ELSE tgt."a1" END,"b2"= CASE WHEN COALESCE(stg."b2" NOT LIKE '%__debezium_unavailable_value%', TRUE) THEN stg."b2" ELSE tgt."b2" END,"c3"=stg."c3"`,
},
}

Expand Down

0 comments on commit 6cd5cd1

Please sign in to comment.