Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Nov 15, 2024
1 parent d461370 commit 3bf2fcf
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 26 deletions.
19 changes: 4 additions & 15 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
Expand Down Expand Up @@ -44,25 +43,15 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
}
} else {
if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
return fmt.Errorf("failed to add columns for table %q: %w", tableID.Table(), err)
}
}

// Keys that exist in DWH, but not in our CDC stream.
deleteAlterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
ColumnOp: constants.Delete,
ContainOtherOperations: tableData.ContainOtherOperations(),
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}

if err = deleteAlterTableArgs.AlterTable(dwh, srcKeysMissing...); err != nil {
return fmt.Errorf("failed to apply alter table: %w", err)
if err = AlterTableDropColumns(ctx, dwh, tableConfig, tableID, srcKeysMissing, tableData.LatestCDCTs, tableData.ContainOtherOperations()); err != nil {
return fmt.Errorf("failed to drop columns for table %q: %w", tableID.Table(), err)
}

// TODO: Examine whether [AuditColumnsToDelete] still needs to be called.
tableConfig.AuditColumnsToDelete(srcKeysMissing)
if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
Expand Down
33 changes: 33 additions & 0 deletions clients/shared/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
Expand Down Expand Up @@ -61,3 +62,35 @@ func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc
tc.MutateInMemoryColumns(constants.Add, colsToAdd...)
return nil
}

func AlterTableDropColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, tableID sql.TableIdentifier, cols []columns.Column, cdcTime time.Time, containOtherOperations bool) error {
if len(cols) == 0 {
return nil
}

var colsToDrop []columns.Column
for _, col := range cols {
if tc.ShouldDeleteColumn(col.Name(), cdcTime, containOtherOperations) {
colsToDrop = append(colsToDrop, col)
}
}

if len(colsToDrop) == 0 {
return nil
}

for _, colToDrop := range colsToDrop {
query, err := ddl.BuildAlterTableDropColumns(dwh.Dialect(), tableID, colToDrop)
if err != nil {
return fmt.Errorf("failed to build alter table drop columns: %w", err)
}

slog.Info("[DDL] Executing query", slog.String("query", query))
if _, err = dwh.ExecContext(ctx, query); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}
}

tc.MutateInMemoryColumns(constants.Delete, colsToDrop...)
return nil
}
8 changes: 8 additions & 0 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ func BuildAlterTableAddColumns(dialect sql.Dialect, tableID sql.TableIdentifier,
return parts, nil
}

func BuildAlterTableDropColumns(dialect sql.Dialect, tableID sql.TableIdentifier, col columns.Column) (string, error) {
if col.ShouldSkip() {
return "", fmt.Errorf("received an invalid column %q", col.Name())
}

return dialect.BuildAlterColumnQuery(tableID, constants.Delete, dialect.QuoteIdentifier(col.Name())), nil
}

type AlterTableArgs struct {
Dialect sql.Dialect
Tc *types.DwhTableConfig
Expand Down
16 changes: 5 additions & 11 deletions lib/destination/ddl/ddl_alter_delete_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package ddl_test

import (
"context"
"fmt"
"strings"
"time"

"github.com/artie-labs/transfer/clients/shared"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
Expand Down Expand Up @@ -59,17 +62,8 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Snowflake
for _, column := range cols.GetColumns() {
alterTableArgs := ddl.AlterTableArgs{
Dialect: d.snowflakeStagesStore.Dialect(),
Tc: snowflakeTc,
TableID: snowflakeTableID,
ContainOtherOperations: true,
ColumnOp: constants.Delete,
CdcTime: ts,
Mode: config.Replication,
}

assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, column))
err := shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, snowflakeTc, snowflakeTableID, []columns.Column{column}, ts, true)
assert.NoError(d.T(), err)
}

// Never actually deleted.
Expand Down

0 comments on commit 3bf2fcf

Please sign in to comment.