Skip to content

Commit

Permalink
[DDL] Remove TemporaryTable flag (#1037)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 14, 2024
1 parent 1d16234 commit ef280a5
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 95 deletions.
18 changes: 4 additions & 14 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,9 @@ type AlterTableArgs struct {
ContainOtherOperations bool
TableID sql.TableIdentifier
CreateTable bool
TemporaryTable bool

ColumnOp constants.ColumnOperation
Mode config.Mode

CdcTime time.Time
ColumnOp constants.ColumnOperation
Mode config.Mode
CdcTime time.Time
}

func (a AlterTableArgs) Validate() error {
Expand All @@ -98,13 +95,6 @@ func (a AlterTableArgs) Validate() error {
return fmt.Errorf("unexpected mode: %s", a.Mode.String())
}

// Temporary tables should only be created, not altered.
if a.TemporaryTable {
if !a.CreateTable {
return fmt.Errorf("incompatible operation - we should not be altering temporary tables, only create")
}
}

return nil
}

Expand Down Expand Up @@ -154,7 +144,7 @@ func (a AlterTableArgs) buildStatements(cols ...columns.Column) ([]string, []col

var alterStatements []string
if a.CreateTable {
alterStatements = []string{a.Dialect.BuildCreateTableQuery(a.TableID, a.TemporaryTable, colSQLParts)}
alterStatements = []string{a.Dialect.BuildCreateTableQuery(a.TableID, false, colSQLParts)}
} else {
for _, colSQLPart := range colSQLParts {
alterStatements = append(alterStatements, a.Dialect.BuildAlterColumnQuery(a.TableID, a.ColumnOp, colSQLPart))
Expand Down
77 changes: 6 additions & 71 deletions lib/destination/ddl/ddl_temp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ddl_test

import (
"fmt"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -12,91 +11,27 @@ import (
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/mocks"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (d *DDLTestSuite) TestCreateTemporaryTable_Errors() {
tableID := dialect.NewTableIdentifier("", "mock_dataset", "mock_table")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
snowflakeTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
args := ddl.AlterTableArgs{
Dialect: d.snowflakeStagesStore.Dialect(),
Tc: snowflakeTc,
TableID: tableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
CdcTime: time.Time{},
Mode: config.Replication,
}

// No columns.
assert.NoError(d.T(), args.AlterTable(d.snowflakeStagesStore))

args.ColumnOp = constants.Delete
assert.ErrorContains(d.T(), args.AlterTable(d.snowflakeStagesStore), "incompatible operation - cannot drop columns and create table at the same time")

// Change it to SFLK + Stage
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
snowflakeStagesTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
args.Dialect = d.snowflakeStagesStore.Dialect()
args.Tc = snowflakeStagesTc
args.CreateTable = false

assert.ErrorContains(d.T(), args.AlterTable(d.snowflakeStagesStore), "incompatible operation - we should not be altering temporary tables, only create")
}

func (d *DDLTestSuite) TestCreateTemporaryTable() {
{
// Snowflake Stage
tableID := dialect.NewTableIdentifier("db", "schema", "tempTableName")
d.snowflakeStagesStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
sflkStageTc := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
args := ddl.AlterTableArgs{
Dialect: d.snowflakeStagesStore.Dialect(),
Tc: sflkStageTc,
TableID: tableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
CdcTime: time.Time{},
Mode: config.Replication,
}

assert.NoError(d.T(), args.AlterTable(d.snowflakeStagesStore, columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)))
assert.Equal(d.T(), 1, d.fakeSnowflakeStagesStore.ExecCallCount())
query, _ := d.fakeSnowflakeStagesStore.ExecArgsForCall(0)

assert.Contains(d.T(),
query,
`CREATE TABLE IF NOT EXISTS db.schema."TEMPTABLENAME" ("FOO" string,"BAR" float,"START" string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`,
query)
query, err := ddl.BuildCreateTableSQL(d.snowflakeStagesStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("start", typing.String)})
assert.NoError(d.T(), err)
assert.Equal(d.T(), query, `CREATE TABLE IF NOT EXISTS db.schema."TEMPTABLENAME" ("FOO" string,"BAR" float,"START" string) STAGE_COPY_OPTIONS = ( PURGE = TRUE ) STAGE_FILE_FORMAT = ( TYPE = 'csv' FIELD_DELIMITER= '\t' FIELD_OPTIONALLY_ENCLOSED_BY='"' NULL_IF='\\N' EMPTY_FIELD_AS_NULL=FALSE)`)
}
{
// BigQuery
tableID := bigQueryDialect.NewTableIdentifier("db", "schema", "tempTableName")
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(nil, true))
bqTc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)
args := ddl.AlterTableArgs{
Dialect: d.bigQueryStore.Dialect(),
Tc: bqTc,
TableID: tableID,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
CdcTime: time.Time{},
Mode: config.Replication,
}

assert.NoError(d.T(), args.AlterTable(d.bigQueryStore, columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String)))
assert.Equal(d.T(), 1, d.fakeBigQueryStore.ExecCallCount())
bqQuery, _ := d.fakeBigQueryStore.ExecArgsForCall(0)
query, err := ddl.BuildCreateTableSQL(d.bigQueryStore.Dialect(), tableID, true, config.Replication, []columns.Column{columns.NewColumn("foo", typing.String), columns.NewColumn("bar", typing.Float), columns.NewColumn("select", typing.String)})
assert.NoError(d.T(), err)
// Cutting off the expiration_timestamp since it's time based.
assert.Contains(d.T(), bqQuery, "CREATE TABLE IF NOT EXISTS `db`.`schema`.`tempTableName` (`foo` string,`bar` float64,`select` string) OPTIONS (expiration_timestamp =")
assert.Contains(d.T(), query, "CREATE TABLE IF NOT EXISTS `db`.`schema`.`tempTableName` (`foo` string,`bar` float64,`select` string) OPTIONS (expiration_timestamp =", query)
}
}

Expand Down
14 changes: 4 additions & 10 deletions lib/destination/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,14 @@ func TestAlterTableArgs_Validate(t *testing.T) {
a.Dialect = bqDialect.BigQueryDialect{}
assert.ErrorContains(t, a.Validate(), "incompatible operation - cannot drop columns and create table at the same time")
}
{
a.CreateTable = false
a.TemporaryTable = true
assert.ErrorContains(t, a.Validate(), "incompatible operation - we should not be altering temporary tables, only create")
}
}
{
// Valid
a := AlterTableArgs{
ColumnOp: constants.Add,
CreateTable: true,
TemporaryTable: true,
Mode: config.Replication,
Dialect: bqDialect.BigQueryDialect{},
ColumnOp: constants.Add,
CreateTable: true,
Mode: config.Replication,
Dialect: bqDialect.BigQueryDialect{},
}

assert.NoError(t, a.Validate())
Expand Down

0 comments on commit ef280a5

Please sign in to comment.