Skip to content

Commit

Permalink
Toasted array values.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Nov 22, 2024
1 parent 4344149 commit c78ff93
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 1 deletion.
1 change: 1 addition & 0 deletions clients/mssql/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (md MSSQLDialect) BuildIsNotToastValueExpression(tableAlias constants.Table
if column.KindDetails == typing.Struct {
return fmt.Sprintf("COALESCE(%s, {}) != {'key': '%s'}", colName, constants.ToastUnavailableValuePlaceholder)
}

return fmt.Sprintf("COALESCE(%s, '') != '%s'", colName, constants.ToastUnavailableValuePlaceholder)
}

Expand Down
3 changes: 2 additions & 1 deletion clients/mssql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mssql

import (
"context"
"fmt"
"strings"

_ "github.com/microsoft/go-mssqldb"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTab
}

func LoadStore(cfg config.Config) (*Store, error) {
store, err := db.Open("mssql", cfg.MSSQL.DSN())
store, err := db.Open("mssql", fmt.Sprintf("%s&encrypt=disable", cfg.MSSQL.DSN()))
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
return fmt.Errorf("failed to generate merge statements: %w", err)
}

fmt.Println("mergeStatements", mergeStatements)

if err = destination.ExecStatements(dwh, mergeStatements); err != nil {
return fmt.Errorf("failed to execute merge statements: %w", err)
}
Expand Down
14 changes: 14 additions & 0 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,22 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali
toastedCol = true
}
}

valArray, isOk := val.([]any)
if isOk {
if len(valArray) == 1 {
if _, isOk = valArray[0].(string); isOk {
if valArray[0] == constants.ToastUnavailableValuePlaceholder {
val = constants.ToastUnavailableValuePlaceholder
toastedCol = true
}
}
}
}
}

fmt.Println("toastedCol", toastedCol, "val", val)

if toastedCol {
err := inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{
ToastCol: typing.ToPtr(true),
Expand Down

0 comments on commit c78ff93

Please sign in to comment.