Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Dec 12, 2024
1 parent d93bf20 commit 9770ca0
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
10 changes: 6 additions & 4 deletions clients/bigquery/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ func (BigQueryDialect) IsTableDoesNotExistErr(_ error) bool {

func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string {
colName := sql.QuoteTableAliasColumn(tableAlias, column, bd)
if column.KindDetails == typing.Struct {
return fmt.Sprintf(`COALESCE(TO_JSON_STRING(%s) != '{"key":"%s"}', true)`,
colName, constants.ToastUnavailableValuePlaceholder)

switch column.KindDetails {
case typing.Struct, typing.Array:
return fmt.Sprintf(`TO_JSON_STRING(%s) NOT LIKE '%s'`, colName, "%"+constants.ToastUnavailableValuePlaceholder+"%")
default:
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}
return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder)
}

func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string {
Expand Down
6 changes: 6 additions & 0 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
if err != nil {
return nil, err
}

fmt.Println("Type of values", fmt.Sprintf("%T", values))
for _, val := range values {
fmt.Println("value", val)
}

list := message.Mutable(field).List()
for _, val := range values {
list.Append(protoreflect.ValueOfString(val))
Expand Down
2 changes: 2 additions & 0 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
return fmt.Errorf("failed to generate merge statements: %w", err)
}

fmt.Println("mergeStatements", mergeStatements)

if err = destination.ExecStatements(dwh, mergeStatements); err != nil {
return fmt.Errorf("failed to execute merge statements: %w", err)
}
Expand Down

0 comments on commit 9770ca0

Please sign in to comment.