diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index 9359e95a4..5f120c053 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -49,6 +49,7 @@ func (md MSSQLDialect) BuildIsNotToastValueExpression(tableAlias constants.Table if column.KindDetails == typing.Struct { return fmt.Sprintf("COALESCE(%s, {}) != {'key': '%s'}", colName, constants.ToastUnavailableValuePlaceholder) } + return fmt.Sprintf("COALESCE(%s, '') != '%s'", colName, constants.ToastUnavailableValuePlaceholder) } diff --git a/clients/mssql/store.go b/clients/mssql/store.go index ebdd0e330..72d220c61 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -2,6 +2,7 @@ package mssql import ( "context" + "fmt" "strings" _ "github.com/microsoft/go-mssqldb" @@ -83,7 +84,7 @@ func (s *Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTab } func LoadStore(cfg config.Config) (*Store, error) { - store, err := db.Open("mssql", cfg.MSSQL.DSN()) + store, err := db.Open("mssql", fmt.Sprintf("%s&encrypt=disable", cfg.MSSQL.DSN())) if err != nil { return nil, err } diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 2d4fb20d2..2aadf03be 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -151,6 +151,8 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi return fmt.Errorf("failed to generate merge statements: %w", err) } + fmt.Println("mergeStatements", mergeStatements) + if err = destination.ExecStatements(dwh, mergeStatements); err != nil { return fmt.Errorf("failed to execute merge statements: %w", err) } diff --git a/models/event/event.go b/models/event/event.go index 9dc1ba7b6..a56125040 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -224,8 +224,22 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali toastedCol = true } } + + valArray, isOk := val.([]any) + if isOk { + if len(valArray) == 1 { + if _, isOk = valArray[0].(string); isOk { + if valArray[0] == constants.ToastUnavailableValuePlaceholder { + val = constants.ToastUnavailableValuePlaceholder + toastedCol = true + } + } + } + } } + fmt.Println("toastedCol", toastedCol, "val", val) + if toastedCol { err := inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{ ToastCol: typing.ToPtr(true),