Skip to content

Commit

Permalink
Merge branch 'master' into alter-table
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 15, 2024
2 parents ae48e38 + e034281 commit aad7726
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 44 deletions.
4 changes: 2 additions & 2 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim
// We don't care about srcKeysMissing because we don't drop columns when we append.
_, targetKeysMissing := columns.Diff(
tableData.ReadOnlyInMemoryCols().GetColumns(),
tableConfig.Columns().GetColumns(),
tableConfig.GetColumns(),
tableData.TopicConfig().SoftDelete,
tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt,
Expand Down Expand Up @@ -55,7 +55,7 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim

}

if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil {
if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi

srcKeysMissing, targetKeysMissing := columns.Diff(
tableData.ReadOnlyInMemoryCols().GetColumns(),
tableConfig.Columns().GetColumns(),
tableConfig.GetColumns(),
tableData.TopicConfig().SoftDelete,
tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt,
Expand Down Expand Up @@ -74,7 +74,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
}

tableConfig.AuditColumnsToDelete(srcKeysMissing)
if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil {
if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil {
return fmt.Errorf("failed to merge columns from destination: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion clients/snowflake/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ func (s *SnowflakeTestSuite) TestGetTableConfig() {
assert.NoError(s.T(), err)

assert.True(s.T(), tableConfig.CreateTable())
assert.Equal(s.T(), len(tableConfig.Columns().GetColumns()), 0)
assert.Len(s.T(), tableConfig.GetColumns(), 0)
assert.False(s.T(), tableConfig.DropDeletedColumns())
}
37 changes: 18 additions & 19 deletions lib/destination/ddl/ddl_alter_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns())
assert.Len(d.T(), snowflakeTc.GetColumns(), originalColumnLength)

// BigQuery
for _, column := range cols.GetColumns() {
Expand All @@ -94,7 +94,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns())
assert.Len(d.T(), bqTc.GetColumns(), originalColumnLength)

// Redshift
for _, column := range cols.GetColumns() {
Expand All @@ -113,7 +113,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())
assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength)

// 2. DropDeletedColumns = true, ContainOtherOperations = false, don't delete ever
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
Expand Down Expand Up @@ -146,7 +146,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns())
assert.Len(d.T(), snowflakeTc.GetColumns(), originalColumnLength)

// BigQuery
for _, column := range cols.GetColumns() {
Expand All @@ -165,7 +165,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns())
assert.Len(d.T(), bqTc.GetColumns(), originalColumnLength)

// Redshift
for _, column := range cols.GetColumns() {
Expand All @@ -184,7 +184,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {

// Never actually deleted.
assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())
assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength)

// 3. DropDeletedColumns = true, ContainOtherOperations = true, drop based on timestamp.
d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true))
Expand Down Expand Up @@ -249,13 +249,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
}

// Nothing has been deleted, but it is all added to the permissions table.
assert.Equal(d.T(), originalColumnLength, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns())
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())
assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns())
assert.Len(d.T(), bqTc.GetColumns(), originalColumnLength)
assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength)
assert.Len(d.T(), snowflakeTc.GetColumns(), originalColumnLength)

assert.Equal(d.T(), originalColumnLength, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Len(d.T(), bqTc.ReadOnlyColumnsToDelete(), originalColumnLength)
assert.Len(d.T(), redshiftTc.ReadOnlyColumnsToDelete(), originalColumnLength)
assert.Len(d.T(), snowflakeTc.ReadOnlyColumnsToDelete(), originalColumnLength)

for _, column := range cols.GetColumns() {
alterTableArgs := ddl.AlterTableArgs{
Expand Down Expand Up @@ -298,14 +298,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() {
}

// Everything has been deleted.
assert.Equal(d.T(), 0, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns())
assert.Equal(d.T(), 0, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns())
assert.Equal(d.T(), 0, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns())

assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete())
assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete())
assert.Empty(d.T(), snowflakeTc.GetColumns())
assert.Empty(d.T(), bqTc.GetColumns())
assert.Empty(d.T(), redshiftTc.GetColumns())

assert.Empty(d.T(), snowflakeTc.ReadOnlyColumnsToDelete())
assert.Empty(d.T(), bqTc.ReadOnlyColumnsToDelete())
assert.Empty(d.T(), redshiftTc.ReadOnlyColumnsToDelete())
allColsMap := make(map[string]bool, len(allCols))
for _, allCol := range allCols {
allColsMap[allCol] = true
Expand Down
22 changes: 11 additions & 11 deletions lib/destination/ddl/ddl_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
// Have not deleted, but tried to!
assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
// Columns have not been deleted yet.
assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength)

// Now try to delete again and with an increased TS. It should now be all deleted.
var callIdx int
Expand All @@ -89,7 +89,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() {
// Columns have now been deleted.
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
// Columns have not been deleted yet.
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), 0)
assert.Equal(d.T(), originalColumnLength, d.fakeBigQueryStore.ExecCallCount())
}

Expand Down Expand Up @@ -119,8 +119,8 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {

d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true))
// Prior to adding, there should be no colsToDelete
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete(), 0)
assert.Len(d.T(), existingCols.GetColumns(), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns()))

var callIdx int
tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)
Expand All @@ -144,9 +144,9 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() {
}

// Check all the columns, make sure it's correct. (length)
assert.Equal(d.T(), newColsLen+existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Equal(d.T(), newColsLen+existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns()))
// Check by iterating over the columns
for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns() {
for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns() {
existingCol, isOk := existingCols.GetColumn(column.Name())
if !isOk {
// Check new cols?
Expand Down Expand Up @@ -177,7 +177,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true))
// Prior to adding, there should be no colsToDelete
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())
assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), existingCols.GetColumns(), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns()))

tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID)
var callIdx int
Expand All @@ -202,9 +202,9 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() {
}

// Check all the columns, make sure it's correct. (length)
assert.Equal(d.T(), existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns())
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), existingColsLen)
// Check by iterating over the columns
for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns() {
for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns() {
existingCol, isOk := existingCols.GetColumn(column.Name())
assert.True(d.T(), isOk)
assert.Equal(d.T(), column.KindDetails, existingCol.KindDetails)
Expand Down Expand Up @@ -249,7 +249,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() {
}

assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()))
assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()))
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength)

// Now try to delete again and with an increased TS. It should now be all deleted.
for _, column := range cols.GetColumns() {
Expand All @@ -268,5 +268,5 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() {

// Columns still exist
assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()))
assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()))
assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength)
}
6 changes: 2 additions & 4 deletions lib/destination/ddl/ddl_sflk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() {

// Check the table config
tableConfig := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID)
for _, column := range tableConfig.Columns().GetColumns() {
for _, column := range tableConfig.GetColumns() {
var found bool
for _, expCol := range cols {
if found = column.Name() == expCol.Name(); found {
Expand All @@ -113,9 +113,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() {
}
}

assert.True(d.T(), found,
fmt.Sprintf("Col not found: %s, actual list: %v, expected list: %v",
column.Name(), tableConfig.Columns(), cols))
assert.True(d.T(), found, fmt.Sprintf("Col not found: %s, actual list: %v, expected list: %v", column.Name(), tableConfig.GetColumns(), cols))
}
}

Expand Down
7 changes: 7 additions & 0 deletions lib/destination/types/table_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func (d *DwhTableConfig) DropDeletedColumns() bool {
return d.dropDeletedColumns
}

func (d *DwhTableConfig) GetColumns() []columns.Column {
d.RLock()
defer d.RUnlock()

return d.columns.GetColumns()
}

func (d *DwhTableConfig) Columns() *columns.Columns {
if d == nil {
return nil
Expand Down
10 changes: 5 additions & 5 deletions lib/destination/types/table_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) {
go func(tableCfg *types.DwhTableConfig) {
defer wg.Done()
for j := 0; j < 100; j++ {
assert.Equal(t, 3, len(tableCfg.Columns().GetColumns()), tableCfg.Columns().GetColumns())

assert.Len(t, tableCfg.GetColumns(), 3)
kindDetails := typing.Integer
if (j % 2) == 0 {
kindDetails = typing.Array
}

tableCfg.Columns().UpdateColumn(columns.NewColumn("foo", kindDetails))
assert.Equal(t, 3, len(tableCfg.Columns().GetColumns()), tableCfg.Columns().GetColumns())
assert.Len(t, tableCfg.GetColumns(), 3)
}
}(dwhTableCfg)
}
Expand All @@ -85,7 +85,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) {
tc.MutateInMemoryColumns(false, constants.Add, columns.NewColumn(col, typing.String))
}

assert.Len(t, tc.Columns().GetColumns(), 5)
assert.Len(t, tc.GetColumns(), 5)
var wg sync.WaitGroup
for _, addCol := range []string{"aa", "bb", "cc", "dd", "ee", "ff"} {
wg.Add(1)
Expand All @@ -104,7 +104,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) {
}

wg.Wait()
assert.Len(t, tc.Columns().GetColumns(), 6)
assert.Len(t, tc.GetColumns(), 6)
}

func TestDwhTableConfig_ReadOnlyColumnsToDelete(t *testing.T) {
Expand Down

0 comments on commit aad7726

Please sign in to comment.