From 3bf2fcf619800ab3af265b0d8ce690ad39e2a570 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 15 Nov 2024 15:22:31 -0800 Subject: [PATCH] WIP. --- clients/shared/merge.go | 19 +++-------- clients/shared/table.go | 33 ++++++++++++++++++++ lib/destination/ddl/ddl.go | 8 +++++ lib/destination/ddl/ddl_alter_delete_test.go | 16 +++------- 4 files changed, 50 insertions(+), 26 deletions(-) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 2c34cc05b..341c3879e 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -6,7 +6,6 @@ import ( "log/slog" "time" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" @@ -44,25 +43,15 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi } } else { if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil { - return fmt.Errorf("failed to alter table: %w", err) + return fmt.Errorf("failed to add columns for table %q: %w", tableID.Table(), err) } } - // Keys that exist in DWH, but not in our CDC stream. - deleteAlterTableArgs := ddl.AlterTableArgs{ - Dialect: dwh.Dialect(), - Tc: tableConfig, - TableID: tableID, - ColumnOp: constants.Delete, - ContainOtherOperations: tableData.ContainOtherOperations(), - CdcTime: tableData.LatestCDCTs, - Mode: tableData.Mode(), - } - - if err = deleteAlterTableArgs.AlterTable(dwh, srcKeysMissing...); err != nil { - return fmt.Errorf("failed to apply alter table: %w", err) + if err = AlterTableDropColumns(ctx, dwh, tableConfig, tableID, srcKeysMissing, tableData.LatestCDCTs, tableData.ContainOtherOperations()); err != nil { + return fmt.Errorf("failed to drop columns for table %q: %w", tableID.Table(), err) } + // TODO: Examine whether [AuditColumnsToDelete] still needs to be called. tableConfig.AuditColumnsToDelete(srcKeysMissing) if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil { return fmt.Errorf("failed to merge columns from destination: %w", err) diff --git a/clients/shared/table.go b/clients/shared/table.go index 3a1fbeac7..e92ab6739 100644 --- a/clients/shared/table.go +++ b/clients/shared/table.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" @@ -61,3 +62,35 @@ func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc tc.MutateInMemoryColumns(constants.Add, colsToAdd...) return nil } + +func AlterTableDropColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, tableID sql.TableIdentifier, cols []columns.Column, cdcTime time.Time, containOtherOperations bool) error { + if len(cols) == 0 { + return nil + } + + var colsToDrop []columns.Column + for _, col := range cols { + if tc.ShouldDeleteColumn(col.Name(), cdcTime, containOtherOperations) { + colsToDrop = append(colsToDrop, col) + } + } + + if len(colsToDrop) == 0 { + return nil + } + + for _, colToDrop := range colsToDrop { + query, err := ddl.BuildAlterTableDropColumns(dwh.Dialect(), tableID, colToDrop) + if err != nil { + return fmt.Errorf("failed to build alter table drop columns: %w", err) + } + + slog.Info("[DDL] Executing query", slog.String("query", query)) + if _, err = dwh.ExecContext(ctx, query); err != nil { + return fmt.Errorf("failed to alter table: %w", err) + } + } + + tc.MutateInMemoryColumns(constants.Delete, colsToDrop...) + return nil +} diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 8546f4a36..92ca91ced 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -83,6 +83,14 @@ func BuildAlterTableAddColumns(dialect sql.Dialect, tableID sql.TableIdentifier, return parts, nil } +func BuildAlterTableDropColumns(dialect sql.Dialect, tableID sql.TableIdentifier, col columns.Column) (string, error) { + if col.ShouldSkip() { + return "", fmt.Errorf("received an invalid column %q", col.Name()) + } + + return dialect.BuildAlterColumnQuery(tableID, constants.Delete, dialect.QuoteIdentifier(col.Name())), nil +} + type AlterTableArgs struct { Dialect sql.Dialect Tc *types.DwhTableConfig diff --git a/lib/destination/ddl/ddl_alter_delete_test.go b/lib/destination/ddl/ddl_alter_delete_test.go index 666ade07d..72d764ed2 100644 --- a/lib/destination/ddl/ddl_alter_delete_test.go +++ b/lib/destination/ddl/ddl_alter_delete_test.go @@ -1,10 +1,13 @@ package ddl_test import ( + "context" "fmt" "strings" "time" + "github.com/artie-labs/transfer/clients/shared" + "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/lib/config" @@ -59,17 +62,8 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Snowflake for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: snowflakeTc, - TableID: snowflakeTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, column)) + err := shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, snowflakeTc, snowflakeTableID, []columns.Column{column}, ts, true) + assert.NoError(d.T(), err) } // Never actually deleted.