From de27f69270e1806fe3d49a15cbff2f229c858442 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sat, 14 Dec 2024 19:50:15 -0800 Subject: [PATCH 1/4] WIP. --- clients/databricks/dialect/dialect.go | 9 +++++--- clients/databricks/dialect/dialect_test.go | 24 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 1a54254f8..b4b8fdb5b 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -31,11 +31,14 @@ func (DatabricksDialect) IsTableDoesNotExistErr(err error) bool { } func (d DatabricksDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string { + toastedValue := "%" + constants.ToastUnavailableValuePlaceholder + "%" colName := sql.QuoteTableAliasColumn(tableAlias, column, d) - if column.KindDetails == typing.Struct { - return fmt.Sprintf("COALESCE(%s != {'key': '%s'}, true)", colName, constants.ToastUnavailableValuePlaceholder) + switch column.KindDetails { + case typing.String: + return fmt.Sprintf("COALESCE(%s NOT LIKE '%s', TRUE)", colName, toastedValue) + default: + return fmt.Sprintf("COALESCE(TO_VARCHAR(%s) NOT LIKE '%s', TRUE)", colName, toastedValue) } - return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } func (DatabricksDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { diff --git a/clients/databricks/dialect/dialect_test.go b/clients/databricks/dialect/dialect_test.go index 6d3c3fc59..053b7a7e4 100644 --- a/clients/databricks/dialect/dialect_test.go +++ b/clients/databricks/dialect/dialect_test.go @@ -50,6 +50,30 @@ func TestDatabricksDialect_IsTableDoesNotExistErr(t *testing.T) { } } +func TestDatabricks_BuildIsNotToastValueExpression(t *testing.T) { + { + // Unspecified data type + assert.Equal(t, + "COALESCE(TO_VARCHAR(tbl.`bar`) NOT LIKE '%__debezium_unavailable_value%', TRUE)", + DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)), + ) + } + { + // Structs + assert.Equal(t, + "COALESCE(TO_VARCHAR(tbl.`foo`) NOT LIKE '%__debezium_unavailable_value%', TRUE)", + DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)), + ) + } + { + // String + assert.Equal(t, + "COALESCE(tbl.`bar` NOT LIKE '%__debezium_unavailable_value%', TRUE)", + DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.String)), + ) + } +} + func TestDatabricksDialect_BuildCreateTableQuery(t *testing.T) { fakeTableID := &mocks.FakeTableIdentifier{} fakeTableID.FullyQualifiedNameReturns("{TABLE}") From 64f85be51050e08498fc240776b53b5d8d7eb483 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sun, 15 Dec 2024 16:08:26 -0800 Subject: [PATCH 2/4] Escape correctly. --- clients/databricks/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 74d86b2c7..02df68b94 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -122,7 +122,7 @@ func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizatio // Copy file from DBFS -> table via COPY INTO, ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html // We'll need \\\\N here because we need to string escape. - copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), file.DBFSFilePath()) + copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('escape' = '"', 'delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), file.DBFSFilePath()) if _, err = s.ExecContext(ctx, copyCommand); err != nil { return fmt.Errorf("failed to run COPY INTO for temporary table: %w", err) } From 739d942bd5fd66a43d3c99082879f0f8ad4bdc32 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sun, 15 Dec 2024 16:34:41 -0800 Subject: [PATCH 3/4] Update. --- clients/databricks/dialect/dialect.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index b4b8fdb5b..e0ac07ca5 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -37,7 +37,7 @@ func (d DatabricksDialect) BuildIsNotToastValueExpression(tableAlias constants.T case typing.String: return fmt.Sprintf("COALESCE(%s NOT LIKE '%s', TRUE)", colName, toastedValue) default: - return fmt.Sprintf("COALESCE(TO_VARCHAR(%s) NOT LIKE '%s', TRUE)", colName, toastedValue) + return fmt.Sprintf("COALESCE(CAST (%s AS STRING) NOT LIKE '%s', TRUE)", colName, toastedValue) } } From b0860de63a592cb84e1f827562aee374f6fb19ec Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sun, 15 Dec 2024 16:35:43 -0800 Subject: [PATCH 4/4] Clean up. --- clients/databricks/dialect/dialect.go | 2 +- clients/databricks/dialect/dialect_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index e0ac07ca5..0bcd0e5f2 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -37,7 +37,7 @@ func (d DatabricksDialect) BuildIsNotToastValueExpression(tableAlias constants.T case typing.String: return fmt.Sprintf("COALESCE(%s NOT LIKE '%s', TRUE)", colName, toastedValue) default: - return fmt.Sprintf("COALESCE(CAST (%s AS STRING) NOT LIKE '%s', TRUE)", colName, toastedValue) + return fmt.Sprintf("COALESCE(CAST(%s AS STRING) NOT LIKE '%s', TRUE)", colName, toastedValue) } } diff --git a/clients/databricks/dialect/dialect_test.go b/clients/databricks/dialect/dialect_test.go index 053b7a7e4..e249b7acc 100644 --- a/clients/databricks/dialect/dialect_test.go +++ b/clients/databricks/dialect/dialect_test.go @@ -54,14 +54,14 @@ func TestDatabricks_BuildIsNotToastValueExpression(t *testing.T) { { // Unspecified data type assert.Equal(t, - "COALESCE(TO_VARCHAR(tbl.`bar`) NOT LIKE '%__debezium_unavailable_value%', TRUE)", + "COALESCE(CAST(tbl.`bar` AS STRING) NOT LIKE '%__debezium_unavailable_value%', TRUE)", DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)), ) } { // Structs assert.Equal(t, - "COALESCE(TO_VARCHAR(tbl.`foo`) NOT LIKE '%__debezium_unavailable_value%', TRUE)", + "COALESCE(CAST(tbl.`foo` AS STRING) NOT LIKE '%__debezium_unavailable_value%', TRUE)", DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)), ) }