Skip to content

Commit

Permalink
[DDL] Standing up AlterTableAddColumns (#1049)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 15, 2024
1 parent 8da8b57 commit d461370
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 25 deletions.
15 changes: 1 addition & 14 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"context"
"fmt"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/typing/columns"
Expand Down Expand Up @@ -38,20 +36,9 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim
return fmt.Errorf("failed to create table: %w", err)
}
} else {
alterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}

// Keys that exist in CDC stream, but not in DWH
if err = alterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}

}

if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil {
Expand Down
12 changes: 1 addition & 11 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
return fmt.Errorf("failed to create table: %w", err)
}
} else {
alterTableArgs := ddl.AlterTableArgs{
Dialect: dwh.Dialect(),
Tc: tableConfig,
TableID: tableID,
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
Mode: tableData.Mode(),
}

// Columns that are missing in DWH, but exist in our CDC stream.
if err = alterTableArgs.AlterTable(dwh, targetKeysMissing...); err != nil {
if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}
}
Expand Down
33 changes: 33 additions & 0 deletions clients/shared/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, tableID sql.TableIdentifier, tempTable bool) error {
Expand All @@ -28,3 +29,35 @@ func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *
tc.MutateInMemoryColumns(constants.Add, tableData.ReadOnlyInMemoryCols().GetColumns()...)
return nil
}

func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, tableID sql.TableIdentifier, cols []columns.Column) error {
if len(cols) == 0 {
return nil
}

var colsToAdd []columns.Column
for _, col := range cols {
if col.ShouldSkip() {
continue
}

colsToAdd = append(colsToAdd, col)
}

sqlParts, err := ddl.BuildAlterTableAddColumns(dwh.Dialect(), tableID, colsToAdd)
if err != nil {
return fmt.Errorf("failed to build alter table add columns: %w", err)
}

for _, sqlPart := range sqlParts {
slog.Info("[DDL] Executing query", slog.String("query", sqlPart))
if _, err = dwh.ExecContext(ctx, sqlPart); err != nil {
if !dwh.Dialect().IsColumnAlreadyExistsErr(err) {
return fmt.Errorf("failed to alter table: %w", err)
}
}
}

tc.MutateInMemoryColumns(constants.Add, colsToAdd...)
return nil
}
14 changes: 14 additions & 0 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ func DropTemporaryTable(dwh destination.DataWarehouse, tableIdentifier sql.Table
return nil
}

func BuildAlterTableAddColumns(dialect sql.Dialect, tableID sql.TableIdentifier, cols []columns.Column) ([]string, error) {
var parts []string
for _, col := range cols {
if col.ShouldSkip() {
return nil, fmt.Errorf("received an invalid column %q", col.Name())
}

sqlPart := fmt.Sprintf("%s %s", dialect.QuoteIdentifier(col.Name()), dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey()))
parts = append(parts, dialect.BuildAlterColumnQuery(tableID, constants.Add, sqlPart))
}

return parts, nil
}

type AlterTableArgs struct {
Dialect sql.Dialect
Tc *types.DwhTableConfig
Expand Down
42 changes: 42 additions & 0 deletions lib/destination/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,48 @@ func TestBuildCreateTableSQL(t *testing.T) {
}
}

func TestBuildAlterTableAddColumns(t *testing.T) {
{
// No columns
sqlParts, err := BuildAlterTableAddColumns(nil, nil, []columns.Column{})
assert.NoError(t, err)
assert.Empty(t, sqlParts)
}
{
// One column to add
col := columns.NewColumn("dusty", typing.String)
sqlParts, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col})
assert.NoError(t, err)
assert.Len(t, sqlParts, 1)
assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "dusty" VARCHAR(MAX)`, sqlParts[0])
}
{
// Two columns, one invalid, it will error.
col := columns.NewColumn("dusty", typing.String)
_, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"),
[]columns.Column{
col,
columns.NewColumn("invalid", typing.Invalid),
},
)

assert.ErrorContains(t, err, `received an invalid column "invalid"`)
}
{
// Three columns to add
col1 := columns.NewColumn("aussie", typing.String)
col2 := columns.NewColumn("doge", typing.String)
col3 := columns.NewColumn("age", typing.Integer)

sqlParts, err := BuildAlterTableAddColumns(dialect.RedshiftDialect{}, dialect.NewTableIdentifier("schema", "table"), []columns.Column{col1, col2, col3})
assert.NoError(t, err)
assert.Len(t, sqlParts, 3)
assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "aussie" VARCHAR(MAX)`, sqlParts[0])
assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "doge" VARCHAR(MAX)`, sqlParts[1])
assert.Equal(t, `ALTER TABLE schema."table" add COLUMN "age" INT8`, sqlParts[2])
}
}

func TestAlterTableArgs_Validate(t *testing.T) {
{
// Invalid
Expand Down

0 comments on commit d461370

Please sign in to comment.