From e03428184e931aa908d0db5b6e31400f45e9baab Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 15 Nov 2024 08:15:19 -0800 Subject: [PATCH] Remove more usages of columns (#1046) --- clients/shared/append.go | 4 +-- clients/shared/merge.go | 4 +-- clients/snowflake/ddl_test.go | 2 +- lib/destination/ddl/ddl_alter_delete_test.go | 37 ++++++++++---------- lib/destination/ddl/ddl_bq_test.go | 22 ++++++------ lib/destination/ddl/ddl_sflk_test.go | 6 ++-- lib/destination/types/table_config.go | 7 ++++ lib/destination/types/table_config_test.go | 10 +++--- 8 files changed, 48 insertions(+), 44 deletions(-) diff --git a/clients/shared/append.go b/clients/shared/append.go index b36e96d81..1c6823599 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -25,7 +25,7 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim // We don't care about srcKeysMissing because we don't drop columns when we append. _, targetKeysMissing := columns.Diff( tableData.ReadOnlyInMemoryCols().GetColumns(), - tableConfig.Columns().GetColumns(), + tableConfig.GetColumns(), tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt, tableData.TopicConfig().IncludeDatabaseUpdatedAt, @@ -48,7 +48,7 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim return fmt.Errorf("failed to alter table: %w", err) } - if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil { + if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil { return fmt.Errorf("failed to merge columns from destination: %w", err) } diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 9fd419de8..5ca79f16c 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -30,7 +30,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi srcKeysMissing, targetKeysMissing := columns.Diff( tableData.ReadOnlyInMemoryCols().GetColumns(), - tableConfig.Columns().GetColumns(), + tableConfig.GetColumns(), tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt, tableData.TopicConfig().IncludeDatabaseUpdatedAt, @@ -70,7 +70,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi } tableConfig.AuditColumnsToDelete(srcKeysMissing) - if err = tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...); err != nil { + if err = tableData.MergeColumnsFromDestination(tableConfig.GetColumns()...); err != nil { return fmt.Errorf("failed to merge columns from destination: %w", err) } diff --git a/clients/snowflake/ddl_test.go b/clients/snowflake/ddl_test.go index 23706fc77..b9cd38ea5 100644 --- a/clients/snowflake/ddl_test.go +++ b/clients/snowflake/ddl_test.go @@ -116,6 +116,6 @@ func (s *SnowflakeTestSuite) TestGetTableConfig() { assert.NoError(s.T(), err) assert.True(s.T(), tableConfig.CreateTable()) - assert.Equal(s.T(), len(tableConfig.Columns().GetColumns()), 0) + assert.Len(s.T(), tableConfig.GetColumns(), 0) assert.False(s.T(), tableConfig.DropDeletedColumns()) } diff --git a/lib/destination/ddl/ddl_alter_delete_test.go b/lib/destination/ddl/ddl_alter_delete_test.go index f4246210a..4db2a45f1 100644 --- a/lib/destination/ddl/ddl_alter_delete_test.go +++ b/lib/destination/ddl/ddl_alter_delete_test.go @@ -75,7 +75,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Never actually deleted. assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns()) + assert.Len(d.T(), snowflakeTc.GetColumns(), originalColumnLength) // BigQuery for _, column := range cols.GetColumns() { @@ -96,7 +96,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Never actually deleted. assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), originalColumnLength, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns()) + assert.Len(d.T(), bqTc.GetColumns(), originalColumnLength) // Redshift for _, column := range cols.GetColumns() { @@ -116,7 +116,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Never actually deleted. assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns()) + assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength) // 2. DropDeletedColumns = true, ContainOtherOperations = false, don't delete ever d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true)) @@ -150,7 +150,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Never actually deleted. assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns()) + assert.Len(d.T(), snowflakeTc.GetColumns(), originalColumnLength) // BigQuery for _, column := range cols.GetColumns() { @@ -170,7 +170,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Never actually deleted. assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), originalColumnLength, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns()) + assert.Len(d.T(), bqTc.GetColumns(), originalColumnLength) // Redshift for _, column := range cols.GetColumns() { @@ -190,7 +190,7 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { // Never actually deleted. assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns()) + assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength) // 3. DropDeletedColumns = true, ContainOtherOperations = true, drop based on timestamp. d.bigQueryStore.GetConfigMap().AddTableToConfig(bqTableID, types.NewDwhTableConfig(cols.GetColumns(), true)) @@ -258,13 +258,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { } // Nothing has been deleted, but it is all added to the permissions table. - assert.Equal(d.T(), originalColumnLength, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns()) - assert.Equal(d.T(), originalColumnLength, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns()) - assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns()) + assert.Len(d.T(), bqTc.GetColumns(), originalColumnLength) + assert.Len(d.T(), redshiftTc.GetColumns(), originalColumnLength) + assert.Len(d.T(), snowflakeTc.GetColumns(), originalColumnLength) - assert.Equal(d.T(), originalColumnLength, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), originalColumnLength, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), originalColumnLength, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete()) + assert.Len(d.T(), bqTc.ReadOnlyColumnsToDelete(), originalColumnLength) + assert.Len(d.T(), redshiftTc.ReadOnlyColumnsToDelete(), originalColumnLength) + assert.Len(d.T(), snowflakeTc.ReadOnlyColumnsToDelete(), originalColumnLength) for _, column := range cols.GetColumns() { alterTableArgs := ddl.AlterTableArgs{ @@ -310,14 +310,13 @@ func (d *DDLTestSuite) TestAlterDelete_Complete() { } // Everything has been deleted. - assert.Equal(d.T(), 0, len(snowflakeTc.Columns().GetColumns()), snowflakeTc.Columns().GetColumns()) - assert.Equal(d.T(), 0, len(bqTc.Columns().GetColumns()), bqTc.Columns().GetColumns()) - assert.Equal(d.T(), 0, len(redshiftTc.Columns().GetColumns()), redshiftTc.Columns().GetColumns()) - - assert.Equal(d.T(), 0, len(snowflakeTc.ReadOnlyColumnsToDelete()), snowflakeTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), 0, len(bqTc.ReadOnlyColumnsToDelete()), bqTc.ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), 0, len(redshiftTc.ReadOnlyColumnsToDelete()), redshiftTc.ReadOnlyColumnsToDelete()) + assert.Empty(d.T(), snowflakeTc.GetColumns()) + assert.Empty(d.T(), bqTc.GetColumns()) + assert.Empty(d.T(), redshiftTc.GetColumns()) + assert.Empty(d.T(), snowflakeTc.ReadOnlyColumnsToDelete()) + assert.Empty(d.T(), bqTc.ReadOnlyColumnsToDelete()) + assert.Empty(d.T(), redshiftTc.ReadOnlyColumnsToDelete()) allColsMap := make(map[string]bool, len(allCols)) for _, allCol := range allCols { allColsMap[allCol] = true diff --git a/lib/destination/ddl/ddl_bq_test.go b/lib/destination/ddl/ddl_bq_test.go index 4c6734078..5f405035c 100644 --- a/lib/destination/ddl/ddl_bq_test.go +++ b/lib/destination/ddl/ddl_bq_test.go @@ -66,7 +66,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { // Have not deleted, but tried to! assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) // Columns have not been deleted yet. - assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) + assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength) // Now try to delete again and with an increased TS. It should now be all deleted. var callIdx int @@ -91,7 +91,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { // Columns have now been deleted. assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) // Columns have not been deleted yet. - assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) + assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), 0) assert.Equal(d.T(), originalColumnLength, d.fakeBigQueryStore.ExecCallCount()) } @@ -121,8 +121,8 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true)) // Prior to adding, there should be no colsToDelete - assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) + assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete(), 0) + assert.Len(d.T(), existingCols.GetColumns(), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns())) var callIdx int tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) @@ -147,9 +147,9 @@ func (d *DDLTestSuite) TestAlterTableAddColumns() { } // Check all the columns, make sure it's correct. (length) - assert.Equal(d.T(), newColsLen+existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) + assert.Equal(d.T(), newColsLen+existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns())) // Check by iterating over the columns - for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns() { + for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns() { existingCol, isOk := existingCols.GetColumn(column.Name()) if !isOk { // Check new cols? @@ -180,7 +180,7 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { d.bigQueryStore.GetConfigMap().AddTableToConfig(tableID, types.NewDwhTableConfig(existingCols.GetColumns(), true)) // Prior to adding, there should be no colsToDelete assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete()) - assert.Equal(d.T(), len(existingCols.GetColumns()), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) + assert.Len(d.T(), existingCols.GetColumns(), len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns())) tc := d.bigQueryStore.GetConfigMap().TableConfigCache(tableID) var callIdx int @@ -206,9 +206,9 @@ func (d *DDLTestSuite) TestAlterTableAddColumnsSomeAlreadyExist() { } // Check all the columns, make sure it's correct. (length) - assert.Equal(d.T(), existingColsLen, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns()), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns()) + assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), existingColsLen) // Check by iterating over the columns - for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns() { + for _, column := range d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns() { existingCol, isOk := existingCols.GetColumn(column.Name()) assert.True(d.T(), isOk) assert.Equal(d.T(), column.KindDetails, existingCol.KindDetails) @@ -254,7 +254,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { } assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())) - assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns())) + assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength) // Now try to delete again and with an increased TS. It should now be all deleted. for _, column := range cols.GetColumns() { @@ -274,5 +274,5 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { // Columns still exist assert.Equal(d.T(), 0, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).ReadOnlyColumnsToDelete())) - assert.Equal(d.T(), originalColumnLength, len(d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).Columns().GetColumns())) + assert.Len(d.T(), d.bigQueryStore.GetConfigMap().TableConfigCache(tableID).GetColumns(), originalColumnLength) } diff --git a/lib/destination/ddl/ddl_sflk_test.go b/lib/destination/ddl/ddl_sflk_test.go index 142f7a76b..0bf3b1744 100644 --- a/lib/destination/ddl/ddl_sflk_test.go +++ b/lib/destination/ddl/ddl_sflk_test.go @@ -104,7 +104,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() { // Check the table config tableConfig := d.snowflakeStagesStore.GetConfigMap().TableConfigCache(tableID) - for _, column := range tableConfig.Columns().GetColumns() { + for _, column := range tableConfig.GetColumns() { var found bool for _, expCol := range cols { if found = column.Name() == expCol.Name(); found { @@ -113,9 +113,7 @@ func (d *DDLTestSuite) TestAlterTableAdd() { } } - assert.True(d.T(), found, - fmt.Sprintf("Col not found: %s, actual list: %v, expected list: %v", - column.Name(), tableConfig.Columns(), cols)) + assert.True(d.T(), found, fmt.Sprintf("Col not found: %s, actual list: %v, expected list: %v", column.Name(), tableConfig.GetColumns(), cols)) } } diff --git a/lib/destination/types/table_config.go b/lib/destination/types/table_config.go index 4f3c0f7d7..d517a47c3 100644 --- a/lib/destination/types/table_config.go +++ b/lib/destination/types/table_config.go @@ -52,6 +52,13 @@ func (d *DwhTableConfig) DropDeletedColumns() bool { return d.dropDeletedColumns } +func (d *DwhTableConfig) GetColumns() []columns.Column { + d.RLock() + defer d.RUnlock() + + return d.columns.GetColumns() +} + func (d *DwhTableConfig) Columns() *columns.Columns { if d == nil { return nil diff --git a/lib/destination/types/table_config_test.go b/lib/destination/types/table_config_test.go index f555a4fac..01b9f2ec3 100644 --- a/lib/destination/types/table_config_test.go +++ b/lib/destination/types/table_config_test.go @@ -64,14 +64,14 @@ func TestDwhTableConfig_ColumnsConcurrency(t *testing.T) { go func(tableCfg *types.DwhTableConfig) { defer wg.Done() for j := 0; j < 100; j++ { - assert.Equal(t, 3, len(tableCfg.Columns().GetColumns()), tableCfg.Columns().GetColumns()) - + assert.Len(t, tableCfg.GetColumns(), 3) kindDetails := typing.Integer if (j % 2) == 0 { kindDetails = typing.Array } + tableCfg.Columns().UpdateColumn(columns.NewColumn("foo", kindDetails)) - assert.Equal(t, 3, len(tableCfg.Columns().GetColumns()), tableCfg.Columns().GetColumns()) + assert.Len(t, tableCfg.GetColumns(), 3) } }(dwhTableCfg) } @@ -85,7 +85,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { tc.MutateInMemoryColumns(false, constants.Add, columns.NewColumn(col, typing.String)) } - assert.Len(t, tc.Columns().GetColumns(), 5) + assert.Len(t, tc.GetColumns(), 5) var wg sync.WaitGroup for _, addCol := range []string{"aa", "bb", "cc", "dd", "ee", "ff"} { wg.Add(1) @@ -104,7 +104,7 @@ func TestDwhTableConfig_MutateInMemoryColumns(t *testing.T) { } wg.Wait() - assert.Len(t, tc.Columns().GetColumns(), 6) + assert.Len(t, tc.GetColumns(), 6) } func TestDwhTableConfig_ReadOnlyColumnsToDelete(t *testing.T) {