From b36eb7a537f7f5a8f34aab38aefd1ae45514943d Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 20 Nov 2024 11:39:26 -0800 Subject: [PATCH 1/4] Adding columns. --- clients/bigquery/bigquery.go | 2 +- clients/databricks/store.go | 2 +- clients/mssql/staging.go | 2 +- clients/redshift/staging.go | 2 +- clients/shared/append.go | 2 +- clients/shared/merge.go | 2 +- clients/shared/table.go | 4 ++-- clients/snowflake/staging.go | 2 +- 8 files changed, 9 insertions(+), 9 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 5b3332506..8ee3707c3 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -82,7 +82,7 @@ func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, u func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true); err != nil { + if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { return err } } diff --git a/clients/databricks/store.go b/clients/databricks/store.go index 6713468d1..b4f5c555d 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -82,7 +82,7 @@ func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTabl func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true); err != nil { + if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { return err } } diff --git a/clients/mssql/staging.go b/clients/mssql/staging.go index 5bb2ebe2e..b289e48d6 100644 --- a/clients/mssql/staging.go +++ b/clients/mssql/staging.go @@ -15,7 +15,7 @@ import ( func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true); err != nil { + if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { return err } } diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index 99d465c77..f2732ba3c 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -40,7 +40,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati } if createTempTable { - if err = shared.CreateTable(ctx, s, tableData, tableConfig, opts.ColumnSettings, tempTableID, true); err != nil { + if err = shared.CreateTable(ctx, s, tableData, tableConfig, opts.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { return err } } diff --git a/clients/shared/append.go b/clients/shared/append.go index ec762064c..54f60d905 100644 --- a/clients/shared/append.go +++ b/clients/shared/append.go @@ -32,7 +32,7 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) if tableConfig.CreateTable() { - if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false); err != nil { + if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false, targetKeysMissing); err != nil { return fmt.Errorf("failed to create table: %w", err) } } else { diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 41ce87001..2f00a5837 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -38,7 +38,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) if tableConfig.CreateTable() { - if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false); err != nil { + if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { return fmt.Errorf("failed to create table: %w", err) } } else { diff --git a/clients/shared/table.go b/clients/shared/table.go index aab6809a4..565333579 100644 --- a/clients/shared/table.go +++ b/clients/shared/table.go @@ -16,8 +16,8 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) -func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, tempTable bool) error { - query, err := ddl.BuildCreateTableSQL(settings, dwh.Dialect(), tableID, tempTable, tableData.Mode(), tableData.ReadOnlyInMemoryCols().GetColumns()) +func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, tempTable bool, cols []columns.Column) error { + query, err := ddl.BuildCreateTableSQL(settings, dwh.Dialect(), tableID, tempTable, tableData.Mode(), cols) if err != nil { return fmt.Errorf("failed to build create table sql: %w", err) } diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 4a09b6a12..58693270e 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -52,7 +52,7 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) { func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, additionalSettings.ColumnSettings, tempTableID, true); err != nil { + if err := shared.CreateTable(ctx, s, tableData, dwh, additionalSettings.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { return err } } From e9d8998a93e4df0d112332313b72cf15f669f4f7 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 20 Nov 2024 11:43:05 -0800 Subject: [PATCH 2/4] Update. --- clients/shared/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/shared/table.go b/clients/shared/table.go index 565333579..1c7b141d8 100644 --- a/clients/shared/table.go +++ b/clients/shared/table.go @@ -28,7 +28,7 @@ func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData * } // Update cache with the new columns that we've added. - tc.MutateInMemoryColumns(constants.Add, tableData.ReadOnlyInMemoryCols().GetColumns()...) + tc.MutateInMemoryColumns(constants.Add, cols...) return nil } From 54b96d60c16f250ee98380b5c016627d17ecdc7e Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 20 Nov 2024 11:43:28 -0800 Subject: [PATCH 3/4] More. --- clients/shared/merge.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 2f00a5837..24aeb19d6 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -38,7 +38,7 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name()) if tableConfig.CreateTable() { - if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { + if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false, targetKeysMissing); err != nil { return fmt.Errorf("failed to create table: %w", err) } } else { From 99103ae80ec14939140a1ce8c51b67a72047696d Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 20 Nov 2024 11:47:03 -0800 Subject: [PATCH 4/4] Separate out the functionality. --- clients/bigquery/bigquery.go | 2 +- clients/databricks/store.go | 2 +- clients/mssql/staging.go | 2 +- clients/redshift/staging.go | 2 +- clients/shared/table.go | 4 ++++ clients/snowflake/staging.go | 2 +- 6 files changed, 9 insertions(+), 5 deletions(-) diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index 8ee3707c3..9485b41f1 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -82,7 +82,7 @@ func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, u func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { + if err := shared.CreateTempTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID); err != nil { return err } } diff --git a/clients/databricks/store.go b/clients/databricks/store.go index b4f5c555d..74d86b2c7 100644 --- a/clients/databricks/store.go +++ b/clients/databricks/store.go @@ -82,7 +82,7 @@ func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTabl func (s Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { + if err := shared.CreateTempTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID); err != nil { return err } } diff --git a/clients/mssql/staging.go b/clients/mssql/staging.go index b289e48d6..1a1272c07 100644 --- a/clients/mssql/staging.go +++ b/clients/mssql/staging.go @@ -15,7 +15,7 @@ import ( func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, opts types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { + if err := shared.CreateTempTable(ctx, s, tableData, dwh, opts.ColumnSettings, tempTableID); err != nil { return err } } diff --git a/clients/redshift/staging.go b/clients/redshift/staging.go index f2732ba3c..6bc298ad8 100644 --- a/clients/redshift/staging.go +++ b/clients/redshift/staging.go @@ -40,7 +40,7 @@ func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimizati } if createTempTable { - if err = shared.CreateTable(ctx, s, tableData, tableConfig, opts.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { + if err = shared.CreateTempTable(ctx, s, tableData, tableConfig, opts.ColumnSettings, tempTableID); err != nil { return err } } diff --git a/clients/shared/table.go b/clients/shared/table.go index 1c7b141d8..cc0a0dad1 100644 --- a/clients/shared/table.go +++ b/clients/shared/table.go @@ -16,6 +16,10 @@ import ( "github.com/artie-labs/transfer/lib/typing/columns" ) +func CreateTempTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier) error { + return CreateTable(ctx, dwh, tableData, tc, settings, tableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()) +} + func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, tempTable bool, cols []columns.Column) error { query, err := ddl.BuildCreateTableSQL(settings, dwh.Dialect(), tableID, tempTable, tableData.Mode(), cols) if err != nil { diff --git a/clients/snowflake/staging.go b/clients/snowflake/staging.go index 58693270e..c1ee740ab 100644 --- a/clients/snowflake/staging.go +++ b/clients/snowflake/staging.go @@ -52,7 +52,7 @@ func castColValStaging(colVal any, colKind typing.KindDetails) (string, error) { func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error { if createTempTable { - if err := shared.CreateTable(ctx, s, tableData, dwh, additionalSettings.ColumnSettings, tempTableID, true, tableData.ReadOnlyInMemoryCols().GetColumns()); err != nil { + if err := shared.CreateTempTable(ctx, s, tableData, dwh, additionalSettings.ColumnSettings, tempTableID); err != nil { return err } }