diff --git a/clients/databricks/dialect/dialect.go b/clients/databricks/dialect/dialect.go index 1a54254f..0bcd0e5f 100644 --- a/clients/databricks/dialect/dialect.go +++ b/clients/databricks/dialect/dialect.go @@ -31,11 +31,14 @@ func (DatabricksDialect) IsTableDoesNotExistErr(err error) bool { } func (d DatabricksDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string { + toastedValue := "%" + constants.ToastUnavailableValuePlaceholder + "%" colName := sql.QuoteTableAliasColumn(tableAlias, column, d) - if column.KindDetails == typing.Struct { - return fmt.Sprintf("COALESCE(%s != {'key': '%s'}, true)", colName, constants.ToastUnavailableValuePlaceholder) + switch column.KindDetails { + case typing.String: + return fmt.Sprintf("COALESCE(%s NOT LIKE '%s', TRUE)", colName, toastedValue) + default: + return fmt.Sprintf("COALESCE(CAST(%s AS STRING) NOT LIKE '%s', TRUE)", colName, toastedValue) } - return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } func (DatabricksDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { diff --git a/clients/databricks/dialect/dialect_test.go b/clients/databricks/dialect/dialect_test.go index 6d3c3fc5..e249b7ac 100644 --- a/clients/databricks/dialect/dialect_test.go +++ b/clients/databricks/dialect/dialect_test.go @@ -50,6 +50,30 @@ func TestDatabricksDialect_IsTableDoesNotExistErr(t *testing.T) { } } +func TestDatabricks_BuildIsNotToastValueExpression(t *testing.T) { + { + // Unspecified data type + assert.Equal(t, + "COALESCE(CAST(tbl.`bar` AS STRING) NOT LIKE '%__debezium_unavailable_value%', TRUE)", + DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.Invalid)), + ) + } + { + // Structs + assert.Equal(t, + "COALESCE(CAST(tbl.`foo` AS STRING) NOT LIKE '%__debezium_unavailable_value%', TRUE)", + DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("foo", typing.Struct)), + ) + } + { + // String + assert.Equal(t, + "COALESCE(tbl.`bar` NOT LIKE '%__debezium_unavailable_value%', TRUE)", + DatabricksDialect{}.BuildIsNotToastValueExpression("tbl", columns.NewColumn("bar", typing.String)), + ) + } +} + func TestDatabricksDialect_BuildCreateTableQuery(t *testing.T) { fakeTableID := &mocks.FakeTableIdentifier{} fakeTableID.FullyQualifiedNameReturns("{TABLE}") diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 74d86b2c..02df68b9 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -122,7 +122,7 @@ func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizatio // Copy file from DBFS -> table via COPY INTO, ref: https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html // We'll need \\\\N here because we need to string escape. - copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), file.DBFSFilePath()) + copyCommand := fmt.Sprintf(`COPY INTO %s BY POSITION FROM '%s' FILEFORMAT = CSV FORMAT_OPTIONS ('escape' = '"', 'delimiter' = '\t', 'header' = 'false', 'nullValue' = '\\\\N')`, tempTableID.FullyQualifiedName(), file.DBFSFilePath()) if _, err = s.ExecContext(ctx, copyCommand); err != nil { return fmt.Errorf("failed to run COPY INTO for temporary table: %w", err) }