diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 2c34cc05b..341c3879e 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -6,7 +6,6 @@ import ( "log/slog" "time" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" @@ -44,25 +43,15 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi } } else { if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil { - return fmt.Errorf("failed to alter table: %w", err) + return fmt.Errorf("failed to add columns for table %q: %w", tableID.Table(), err) } } - // Keys that exist in DWH, but not in our CDC stream. - deleteAlterTableArgs := ddl.AlterTableArgs{ - Dialect: dwh.Dialect(), - Tc: tableConfig, - TableID: tableID, - ColumnOp: constants.Delete, - ContainOtherOperations: tableData.ContainOtherOperations(), - CdcTime: tableData.LatestCDCTs, - Mode: tableData.Mode(), - } - - if err = deleteAlterTableArgs.AlterTable(dwh, srcKeysMissing...); err != nil { - return fmt.Errorf("failed to apply alter table: %w", err) + if err = AlterTableDropColumns(ctx, dwh, tableConfig, tableID, srcKeysMissing, tableData.LatestCDCTs, tableData.ContainOtherOperations()); err != nil { + return fmt.Errorf("failed to drop columns for table %q: %w", tableID.Table(), err) } + // TODO: Examine whether [AuditColumnsToDelete] still needs to be called. tableConfig.AuditColumnsToDelete(srcKeysMissing) if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil { return fmt.Errorf("failed to merge columns from destination: %w", err) diff --git a/clients/shared/table.go b/clients/shared/table.go index 3a1fbeac7..e92ab6739 100644 --- a/clients/shared/table.go +++ b/clients/shared/table.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" @@ -61,3 +62,35 @@ func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc tc.MutateInMemoryColumns(constants.Add, colsToAdd...) return nil } + +func AlterTableDropColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, tableID sql.TableIdentifier, cols []columns.Column, cdcTime time.Time, containOtherOperations bool) error { + if len(cols) == 0 { + return nil + } + + var colsToDrop []columns.Column + for _, col := range cols { + if tc.ShouldDeleteColumn(col.Name(), cdcTime, containOtherOperations) { + colsToDrop = append(colsToDrop, col) + } + } + + if len(colsToDrop) == 0 { + return nil + } + + for _, colToDrop := range colsToDrop { + query, err := ddl.BuildAlterTableDropColumns(dwh.Dialect(), tableID, colToDrop) + if err != nil { + return fmt.Errorf("failed to build alter table drop columns: %w", err) + } + + slog.Info("[DDL] Executing query", slog.String("query", query)) + if _, err = dwh.ExecContext(ctx, query); err != nil { + return fmt.Errorf("failed to alter table: %w", err) + } + } + + tc.MutateInMemoryColumns(constants.Delete, colsToDrop...) + return nil +} diff --git a/lib/destination/ddl/ddl.go b/lib/destination/ddl/ddl.go index 8546f4a36..326696ee4 100644 --- a/lib/destination/ddl/ddl.go +++ b/lib/destination/ddl/ddl.go @@ -4,17 +4,19 @@ import ( "fmt" "log/slog" "strings" - "time" bigQueryDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" - "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing/columns" ) +func shouldCreatePrimaryKey(col columns.Column, mode config.Mode, createTable bool) bool { + return col.PrimaryKey() && mode == config.Replication && createTable +} + func BuildCreateTableSQL(dialect sql.Dialect, tableIdentifier sql.TableIdentifier, temporaryTable bool, mode config.Mode, columns []columns.Column) (string, error) { if len(columns) == 0 { return "", fmt.Errorf("no columns provided") @@ -83,87 +85,10 @@ func BuildAlterTableAddColumns(dialect sql.Dialect, tableID sql.TableIdentifier, return parts, nil } -type AlterTableArgs struct { - Dialect sql.Dialect - Tc *types.DwhTableConfig - // ContainsOtherOperations - this is sourced from tableData `containOtherOperations` - ContainOtherOperations bool - TableID sql.TableIdentifier - ColumnOp constants.ColumnOperation - Mode config.Mode - CdcTime time.Time -} - -func (a AlterTableArgs) Validate() error { - if a.Dialect == nil { - return fmt.Errorf("dialect cannot be nil") - } - - if !(a.Mode == config.History || a.Mode == config.Replication) { - return fmt.Errorf("unexpected mode: %s", a.Mode.String()) - } - - return nil -} - -func shouldCreatePrimaryKey(col columns.Column, mode config.Mode, createTable bool) bool { - return col.PrimaryKey() && mode == config.Replication && createTable -} - -func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []columns.Column) { - var mutateCol []columns.Column - // It's okay to combine since args.ColumnOp only takes one of: `Delete` or `Add` - var colSQLParts []string - for _, col := range cols { - if col.ShouldSkip() { - // Let's not modify the table if the column kind is invalid - continue - } - - if a.ColumnOp == constants.Delete { - if !a.Tc.ShouldDeleteColumn(col.Name(), a.CdcTime, a.ContainOtherOperations) { - continue - } - } - - mutateCol = append(mutateCol, col) - switch a.ColumnOp { - case constants.Add: - colSQLParts = append(colSQLParts, fmt.Sprintf("%s %s", a.Dialect.QuoteIdentifier(col.Name()), a.Dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey()))) - case constants.Delete: - colSQLParts = append(colSQLParts, a.Dialect.QuoteIdentifier(col.Name())) - } - } - - var alterStatements []string - for _, colSQLPart := range colSQLParts { - alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(a.TableID, a.ColumnOp, colSQLPart)) - } - - return alterStatements, mutateCol -} - -func (a AlterTableArgs) AlterTable(dwh destination.DataWarehouse, cols ...columns.Column) error { - if err := a.Validate(); err != nil { - return err - } - - if len(cols) == 0 { - return nil - } - - alterStatements, mutateCol := a.buildStatements(cols...) - for _, sqlQuery := range alterStatements { - slog.Info("DDL - executing sql", slog.String("query", sqlQuery)) - if _, err := dwh.Exec(sqlQuery); err != nil { - if !a.Dialect.IsColumnAlreadyExistsErr(err) { - return fmt.Errorf("failed to apply ddl, sql: %q, err: %w", sqlQuery, err) - } - } +func BuildAlterTableDropColumns(dialect sql.Dialect, tableID sql.TableIdentifier, col columns.Column) (string, error) { + if col.ShouldSkip() { + return "", fmt.Errorf("received an invalid column %q", col.Name()) } - // createTable = false since it all successfully updated. - a.Tc.MutateInMemoryColumns(a.ColumnOp, mutateCol...) - - return nil + return dialect.BuildAlterColumnQuery(tableID, constants.Delete, dialect.QuoteIdentifier(col.Name())), nil } diff --git a/lib/destination/ddl/ddl_alter_delete_test.go b/lib/destination/ddl/ddl_alter_delete_test.go index 666ade07d..61c3981f5 100644 --- a/lib/destination/ddl/ddl_alter_delete_test.go +++ b/lib/destination/ddl/ddl_alter_delete_test.go @@ -1,15 +1,16 @@ package ddl_test import ( + "context" "fmt" "strings" "time" "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" @@ -59,17 +60,8 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Snowflake for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: snowflakeTc, - TableID: snowflakeTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, column)) + err := shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, snowflakeTc, snowflakeTableID, []columns.Column{column}, ts, true) + assert.NoError(d.T(), err) } // Never actually deleted. @@ -78,18 +70,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // BigQuery for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: bqTc, - TableID: bqTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - - err := alterTableArgs.AlterTable(d.bigQueryStore, column) - assert.NoError(d.T(), err) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.bigQueryStore, bqTc, bqTableID, []columns.Column{column}, ts, true)) } // Never actually deleted. @@ -98,17 +79,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Redshift for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.redshiftStore.Dialect(), - Tc: redshiftTc, - TableID: redshiftTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.redshiftStore, column)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.redshiftStore, redshiftTc, redshiftTableID, []columns.Column{column}, ts, true)) } // Never actually deleted. @@ -131,17 +102,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete()) for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: snowflakeTc, - TableID: snowflakeTableID, - ContainOtherOperations: false, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, column)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, snowflakeTc, snowflakeTableID, []columns.Column{column}, ts, false)) } // Never actually deleted. @@ -150,17 +111,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // BigQuery for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: bqTc, - TableID: bqTableID, - ContainOtherOperations: false, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.bigQueryStore, bqTc, bqTableID, []columns.Column{column}, ts, false)) } // Never actually deleted. @@ -169,21 +120,11 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Redshift for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.redshiftStore.Dialect(), - Tc: redshiftTc, - TableID: redshiftTableID, - ContainOtherOperations: false, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.redshiftStore, column)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.redshiftStore, redshiftTc, redshiftTableID, []columns.Column{column}, ts, false)) } // Never actually deleted. - assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete()) + assert.Empty(d.T(), redshiftTc.ReadOnlyColumnsToDelete()) assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength) // 3. DropDeletedColumns = true, ContainOtherOperations = true, drop based on timestamp. @@ -203,49 +144,21 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete()) // Now, actually try to delete. - // Snowflake - for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: snowflakeTc, - TableID: snowflakeTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, + { + // Snowflake + for _, column := range cols.GetColumns() { + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, snowflakeTc, snowflakeTableID, []columns.Column{column}, ts, true)) } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, column)) } // BigQuery for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: bqTc, - TableID: bqTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.bigQueryStore, bqTc, bqTableID, []columns.Column{column}, ts, true)) } // Redshift for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.redshiftStore.Dialect(), - Tc: redshiftTc, - TableID: redshiftTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.redshiftStore, column)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.redshiftStore, redshiftTc, redshiftTableID, []columns.Column{column}, ts, true)) } // Nothing has been deleted, but it is all added to the permissions table. @@ -258,43 +171,12 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { assert.Len(d.T(), snowflakeTc.ReadOnlyColumnsToDelete(), originalColumnLength) for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: snowflakeTc, - TableID: snowflakeTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, column)) - + // Snowflake + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, snowflakeTc, snowflakeTableID, []columns.Column{column}, ts.Add(2*constants.DeletionConfidencePadding), true)) // BigQuery - alterTableArgs = ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: bqTc, - TableID: bqTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column)) - + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.bigQueryStore, bqTc, bqTableID, []columns.Column{column}, ts.Add(2*constants.DeletionConfidencePadding), true)) // Redshift - alterTableArgs = ddl.AlterTableArgs{ - Dialect: d.redshiftStore.Dialect(), - Tc: redshiftTc, - TableID: redshiftTableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.redshiftStore, column)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.redshiftStore, redshiftTc, redshiftTableID, []columns.Column{column}, ts.Add(2*constants.DeletionConfidencePadding), true)) } // Everything has been deleted. diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index e377eb7f5..9c68d73eb 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -1,6 +1,7 @@ package ddl_test import ( + "context" "database/sql" "errors" "fmt" @@ -9,9 +10,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/clients/bigquery/dialect" + "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" @@ -47,19 +48,9 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) // Prior to deletion, there should be no colsToDelete - assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) + assert.Empty(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Delete, - ContainOtherOperations: true, - CdcTime: ts, - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{column}, ts, true)) } // Have not deleted, but tried to! @@ -70,18 +61,8 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { // Now try to delete again and with an increased TS. It should now be all deleted. var callIdx int for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Delete, - ContainOtherOperations: true, - CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column)) - query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{column}, ts.Add(2*constants.DeletionConfidencePadding), true)) + _, query, _ := d.fakeBigQueryStore.ExecContextArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s drop COLUMN %s", fqName, d.bigQueryStore.Dialect().QuoteIdentifier(column.Name())), query) callIdx += 1 } @@ -90,13 +71,12 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) // Columns have not been deleted yet. assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), 0) - assert.Equal(d.T(), originalColumnLength, d.fakeBigQueryStore.ExecCallCount()) + assert.Equal(d.T(), originalColumnLength, d.fakeBigQueryStore.ExecContextCallCount()) } func (d *DDLTestSuite) TestAlterTableAddColumns() { tableID := dialect.NewTableIdentifier("", "mock_dataset", "add_cols") fqName := tableID.FullyQualifiedName() - ts := time.Now() existingColNameToKindDetailsMap := map[string]typing.KindDetails{ "foo": typing.String, "bar": typing.String, @@ -125,19 +105,10 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { var callIdx int tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) for name, kind := range newCols { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Add, - CdcTime: ts, - Mode: config.Replication, - } - col := columns.NewColumn(name, kind) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{col})) - assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, col)) - query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) + _, query, _ := d.fakeBigQueryStore.ExecContextArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(col.Name()), d.bigQueryStore.Dialect().DataTypeForKind(kind, false)), query) callIdx += 1 @@ -161,7 +132,6 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { tableID := dialect.NewTableIdentifier("", "mock_dataset", "add_cols") fqName := tableID.FullyQualifiedName() - ts := time.Now() existingColNameToKindDetailsMap := map[string]typing.KindDetails{ "foo": typing.String, "bar": typing.String, @@ -184,18 +154,10 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { for _, column := range existingCols.GetColumns() { var sqlResult sql.Result // BQ returning the same error because the column already exists. - d.fakeBigQueryStore.ExecReturnsOnCall(0, sqlResult, errors.New("Column already exists: _string at [1:39]")) - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Add, - CdcTime: ts, - Mode: config.Replication, - } + d.fakeBigQueryStore.ExecContextReturnsOnCall(0, sqlResult, errors.New("Column already exists: _string at [1:39]")) - assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column)) - query, _ := d.fakeBigQueryStore.ExecArgsForCall(callIdx) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{column})) + _, query, _ := d.fakeBigQueryStore.ExecContextArgsForCall(callIdx) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s %s COLUMN %s %s", fqName, constants.Add, d.bigQueryStore.Dialect().QuoteIdentifier(column.Name()), d.bigQueryStore.Dialect().DataTypeForKind(column.KindDetails, false)), query) callIdx += 1 @@ -230,43 +192,22 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { } tableID := d.bigQueryStore.IdentifierFor(td.TopicConfig(), td.Name()) - originalColumnLength := len(columnNameToKindDetailsMap) d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(cols.GetColumns(), false)) tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) // Prior to deletion, there should be no colsToDelete assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Delete, - CdcTime: ts, - Mode: config.Replication, - } - assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{column}, ts, false)) } - assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())) - assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength) + // Because containsOtherOperations is false, it should have never tried to delete. + assert.Equal(d.T(), 0, d.fakeBigQueryStore.ExecCallCount()) - // Now try to delete again and with an increased TS. It should now be all deleted. + // Timestamp got increased, but containsOtherOperations is false, so it should not have tried to delete. for _, column := range cols.GetColumns() { - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.bigQueryStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Delete, - CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.bigQueryStore, column)) - assert.Equal(d.T(), 0, d.fakeBigQueryStore.ExecCallCount()) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.bigQueryStore, tc, tableID, []columns.Column{column}, ts.Add(2*constants.DeletionConfidencePadding), false)) } - // Columns still exist - assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())) - assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength) + assert.Equal(d.T(), 0, d.fakeBigQueryStore.ExecCallCount()) } diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 0bf3b1744..feae06910 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -1,16 +1,17 @@ package ddl_test import ( + "context" "errors" "fmt" + "slices" "time" "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/clients/snowflake/dialect" - "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/destination/ddl" "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -27,25 +28,15 @@ func (d *DDLTestSuite) TestAlterComplexObjects() { tableID := dialect.NewTableIdentifier("shop", "public", "complex_columns") d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Add, - CdcTime: time.Now().UTC(), - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, cols...)) + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols)) for i := 0; i < len(cols); i++ { - execQuery, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i) + _, execQuery, _ := d.fakeSnowflakeStagesStore.ExecContextArgsForCall(i) assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s add COLUMN %s %s", `shop.public."COMPLEX_COLUMNS"`, d.snowflakeStagesStore.Dialect().QuoteIdentifier(cols[i].Name()), d.snowflakeStagesStore.Dialect().DataTypeForKind(cols[i].KindDetails, false)), execQuery) } - assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecCallCount(), "called SFLK the same amt to create cols") + assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols") } func (d *DDLTestSuite) TestAlterIdempotency() { @@ -61,20 +52,12 @@ func (d *DDLTestSuite) TestAlterIdempotency() { tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("column 'order_name' already exists")) - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Add, - CdcTime: time.Now().UTC(), - Mode: config.Replication, - } - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, cols...)) - assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecCallCount(), "called SFLK the same amt to create cols") + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols)) + assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols") - d.fakeSnowflakeStagesStore.ExecReturns(nil, errors.New("table does not exist")) - assert.ErrorContains(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, cols...), "failed to apply ddl") + d.fakeSnowflakeStagesStore.ExecContextReturns(nil, errors.New("table does not exist")) + assert.ErrorContains(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols), `failed to alter table: table does not exist`) } func (d *DDLTestSuite) TestAlterTableAdd() { @@ -90,17 +73,8 @@ func (d *DDLTestSuite) TestAlterTableAdd() { d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Add, - CdcTime: time.Now().UTC(), - Mode: config.Replication, - } - - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, cols...)) - assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecCallCount(), "called SFLK the same amt to create cols") + assert.NoError(d.T(), shared.AlterTableAddColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols)) + assert.Equal(d.T(), len(cols), d.fakeSnowflakeStagesStore.ExecContextCallCount(), "called SFLK the same amt to create cols") // Check the table config tableConfig := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) @@ -129,17 +103,8 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { tableID := dialect.NewTableIdentifier("shop", "public", "users") d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true)) tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: tc, - TableID: tableID, - ContainOtherOperations: true, - ColumnOp: constants.Delete, - CdcTime: time.Now().UTC(), - Mode: config.Replication, - } - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, cols...)) + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols, time.Now().UTC(), true)) assert.Equal(d.T(), 0, d.fakeSnowflakeStagesStore.ExecCallCount(), "tried to delete, but not yet.") // Check the table config @@ -164,10 +129,10 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { // Now let's actually try to dial the time back, and it should actually try to delete. tableConfig.AddColumnsToDelete(colToActuallyDelete, time.Now().Add(-1*time.Hour)) - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, cols...)) - assert.Equal(d.T(), i+1, d.fakeSnowflakeStagesStore.ExecCallCount(), "tried to delete one column") + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols, time.Now().UTC(), true)) + assert.Equal(d.T(), i+1, d.fakeSnowflakeStagesStore.ExecContextCallCount(), "tried to delete one column") - execArg, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(i) + _, execArg, _ := d.fakeSnowflakeStagesStore.ExecContextArgsForCall(i) assert.Equal(d.T(), execArg, fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", `shop.public."USERS"`, constants.Delete, d.snowflakeStagesStore.Dialect().QuoteIdentifier(cols[i].Name()), )) @@ -175,7 +140,6 @@ func (d *DDLTestSuite) TestAlterTableDeleteDryRun() { } func (d *DDLTestSuite) TestAlterTableDelete() { - // Test adding a bunch of columns cols := []columns.Column{ columns.NewColumn("created_at", typing.TimestampTZ), columns.NewColumn("id", typing.Integer), @@ -187,40 +151,37 @@ func (d *DDLTestSuite) TestAlterTableDelete() { tableID := dialect.NewTableIdentifier("shop", "public", "users1") tableCfg := types.NewDwhTableConfig(nil, true) - tableCfg.SetColumnsToDelete(map[string]time.Time{ + colsToDeleteMap := map[string]time.Time{ "col_to_delete": time.Now().Add(-2 * constants.DeletionConfidencePadding), "answers": time.Now().Add(-2 * constants.DeletionConfidencePadding), "start": time.Now().Add(-2 * constants.DeletionConfidencePadding), - }) - + } + tableCfg.SetColumnsToDelete(colsToDeleteMap) d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, tableCfg) - tc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - alterTableArgs := ddl.AlterTableArgs{ - Dialect: d.snowflakeStagesStore.Dialect(), - Tc: tc, - TableID: tableID, - ColumnOp: constants.Delete, - ContainOtherOperations: true, - CdcTime: time.Now(), - Mode: config.Replication, + { + // containsOtherOperations = false + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols, time.Now(), false)) + // Nothing got deleted + assert.Equal(d.T(), 0, d.fakeSnowflakeStagesStore.ExecContextCallCount()) + assert.Equal(d.T(), colsToDeleteMap, tc.ReadOnlyColumnsToDelete()) } + { + // containsOtherOperations = true + assert.NoError(d.T(), shared.AlterTableDropColumns(context.Background(), d.snowflakeStagesStore, tc, tableID, cols, time.Now(), true)) + assert.Equal(d.T(), 3, d.fakeSnowflakeStagesStore.ExecContextCallCount()) - assert.NoError(d.T(), alterTableArgs.AlterTable(d.snowflakeStagesStore, cols...)) - assert.Equal(d.T(), 3, d.fakeSnowflakeStagesStore.ExecCallCount(), "tried to delete, but not yet.") + // Check the table config + tableConfig := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - // Check the table config - tableConfig := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - for col := range tableConfig.ReadOnlyColumnsToDelete() { - var found bool - for _, expCol := range cols { - if found = col == expCol.Name(); found { - break - } + var colsToDelete []string + for col := range tableConfig.ReadOnlyColumnsToDelete() { + colsToDelete = append(colsToDelete, col) } - assert.True(d.T(), found, - fmt.Sprintf("Col not found: %s, actual list: %v, expected list: %v", - col, tableConfig.ReadOnlyColumnsToDelete(), cols)) + // Cols that should have been deleted, have been. The rest are still there the reserve. + assert.Len(d.T(), colsToDelete, 3) + slices.Sort(colsToDelete) + assert.Equal(d.T(), []string{"created_at", "id", "name"}, colsToDelete) } } diff --git a/lib/destination/ddl/ddl_test.go b/lib/destination/ddl/ddl_test.go index 102dfc4dc..19eca3094 100644 --- a/lib/destination/ddl/ddl_test.go +++ b/lib/destination/ddl/ddl_test.go @@ -8,11 +8,40 @@ import ( bqDialect "github.com/artie-labs/transfer/clients/bigquery/dialect" "github.com/artie-labs/transfer/clients/redshift/dialect" "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) +func TestShouldCreatePrimaryKey(t *testing.T) { + pk := columns.NewColumn("foo", typing.String) + pk.SetPrimaryKeyForTest(true) + { + // Primary key check + { + // Column is not a primary key + col := columns.NewColumn("foo", typing.String) + assert.False(t, shouldCreatePrimaryKey(col, config.Replication, true)) + } + { + // Column is a primary key + assert.True(t, shouldCreatePrimaryKey(pk, config.Replication, true)) + } + } + { + // False because it's history mode + // It should be false because we are appending rows to this table. + assert.False(t, shouldCreatePrimaryKey(pk, config.History, true)) + } + { + // False because it's not a create table operation + assert.False(t, shouldCreatePrimaryKey(pk, config.Replication, false)) + } + { + // True because it's a primary key, replication mode, and create table operation + assert.True(t, shouldCreatePrimaryKey(pk, config.Replication, true)) + } +} + func TestBuildCreateTableSQL(t *testing.T) { { // No columns provided @@ -118,56 +147,16 @@ func TestBuildAlterTableAddColumns(t *testing.T) { } } -func TestAlterTableArgs_Validate(t *testing.T) { - { - // Invalid - a := AlterTableArgs{ - ColumnOp: constants.Delete, - Mode: config.Replication, - } - { - // Dialect isn't specified - assert.ErrorContains(t, a.Validate(), "dialect cannot be nil") - } - } - { - // Valid - a := AlterTableArgs{ - ColumnOp: constants.Add, - Mode: config.Replication, - Dialect: bqDialect.BigQueryDialect{}, - } - - assert.NoError(t, a.Validate()) - } -} - -func TestShouldCreatePrimaryKey(t *testing.T) { - pk := columns.NewColumn("foo", typing.String) - pk.SetPrimaryKeyForTest(true) - { - // Primary key check - { - // Column is not a primary key - col := columns.NewColumn("foo", typing.String) - assert.False(t, shouldCreatePrimaryKey(col, config.Replication, true)) - } - { - // Column is a primary key - assert.True(t, shouldCreatePrimaryKey(pk, config.Replication, true)) - } - } +func TestAlterTableDropColumns(t *testing.T) { { - // False because it's history mode - // It should be false because we are appending rows to this table. - assert.False(t, shouldCreatePrimaryKey(pk, config.History, true)) - } - { - // False because it's not a create table operation - assert.False(t, shouldCreatePrimaryKey(pk, config.Replication, false)) + // Invalid column + _, err := BuildAlterTableDropColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), columns.NewColumn("invalid", typing.Invalid)) + assert.ErrorContains(t, err, `received an invalid column "invalid"`) } { - // True because it's a primary key, replication mode, and create table operation - assert.True(t, shouldCreatePrimaryKey(pk, config.Replication, true)) + // Valid column + sql, err := BuildAlterTableDropColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), columns.NewColumn("dusty", typing.String)) + assert.NoError(t, err) + assert.Equal(t, `ALTER TABLE schema."table" drop COLUMN "dusty"`, sql) } }