From 2a5547f8045ca4d4a0efcaeafaddda976cfe7bde Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Thu, 21 Sep 2023 12:59:13 +0200 Subject: [PATCH] chore: moving uploadSchema into UploadJob (#3888) --- warehouse/internal/repo/upload.go | 11 +- warehouse/schema.go | 87 ++++++++------- warehouse/schema_test.go | 42 +++----- warehouse/upload.go | 173 ++++++++++++++---------------- 4 files changed, 146 insertions(+), 167 deletions(-) diff --git a/warehouse/internal/repo/upload.go b/warehouse/internal/repo/upload.go index fd140aa187..8701dbf286 100644 --- a/warehouse/internal/repo/upload.go +++ b/warehouse/internal/repo/upload.go @@ -240,18 +240,19 @@ func (uploads *Uploads) Count(ctx context.Context, filters ...FilterBy) (int64, } func (uploads *Uploads) Get(ctx context.Context, id int64) (model.Upload, error) { - row := uploads.db.QueryRowContext(ctx, ` - SELECT + row := uploads.db.QueryRowContext(ctx, + `SELECT `+uploadColumns+` FROM `+uploadsTableName+` WHERE - id = $1 - `, id) + id = $1`, + id, + ) var upload model.Upload err := scanUpload(row.Scan, &upload) - if err == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { return model.Upload{}, model.ErrUploadNotFound } if err != nil { diff --git a/warehouse/schema.go b/warehouse/schema.go index 384de8918f..558a04fdef 100644 --- a/warehouse/schema.go +++ b/warehouse/schema.go @@ -7,19 +7,16 @@ import ( "reflect" "regexp" - "golang.org/x/exp/slices" - "github.com/samber/lo" + "golang.org/x/exp/slices" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" - "github.com/rudderlabs/rudder-server/warehouse/internal/repo" - "github.com/rudderlabs/rudder-server/warehouse/internal/model" + "github.com/rudderlabs/rudder-server/warehouse/internal/repo" "github.com/rudderlabs/rudder-server/warehouse/logfield" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) var ( @@ -30,7 +27,9 @@ var ( // deprecatedColumnsRegex // This regex is used to identify deprecated columns in the warehouse // Example: abc-deprecated-dba626a7-406a-4757-b3e0-3875559c5840 -var deprecatedColumnsRegex = regexp.MustCompile(`.*-deprecated-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`) +var deprecatedColumnsRegex = regexp.MustCompile( + `.*-deprecated-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`, +) type schemaRepo interface { GetForNamespace(ctx context.Context, sourceID, destID, namespace string) (model.WHSchema, error) @@ -50,7 +49,6 @@ type Schema struct { localSchema model.Schema schemaInWarehouse model.Schema unrecognizedSchemaInWarehouse model.Schema - uploadSchema model.Schema schemaRepo schemaRepo stagingFileRepo stagingFileRepo log logger.Logger @@ -63,12 +61,13 @@ func NewSchema( db *sqlquerywrapper.DB, warehouse model.Warehouse, conf *config.Config, + logger logger.Logger, ) *Schema { return &Schema{ warehouse: warehouse, schemaRepo: repo.NewWHSchemas(db), stagingFileRepo: repo.NewStagingFiles(db), - log: logger.NewLogger().Child("warehouse").Child("schema"), + log: logger, stagingFilesSchemaPaginationSize: conf.GetInt("Warehouse.stagingFilesSchemaPaginationSize", 100), skipDeepEqualSchemas: conf.GetBool("Warehouse.skipDeepEqualSchemas", false), enableIDResolution: conf.GetBool("Warehouse.enableIDResolution", false), @@ -128,8 +127,8 @@ func (sh *Schema) fetchSchemaFromWarehouse(ctx context.Context, repo fetchSchema return fmt.Errorf("fetching schema from warehouse: %w", err) } - sh.skipDeprecatedColumns(warehouseSchema) - sh.skipDeprecatedColumns(unrecognizedWarehouseSchema) + sh.removeDeprecatedColumns(warehouseSchema) + sh.removeDeprecatedColumns(unrecognizedWarehouseSchema) sh.schemaInWarehouse = warehouseSchema sh.unrecognizedSchemaInWarehouse = unrecognizedWarehouseSchema @@ -137,8 +136,8 @@ func (sh *Schema) fetchSchemaFromWarehouse(ctx context.Context, repo fetchSchema return nil } -// skipDeprecatedColumns skips deprecated columns from the schema -func (sh *Schema) skipDeprecatedColumns(schema model.Schema) { +// removeDeprecatedColumns skips deprecated columns from the schema map +func (sh *Schema) removeDeprecatedColumns(schema model.Schema) { for tableName, columnMap := range schema { for columnName := range columnMap { if deprecatedColumnsRegex.MatchString(columnName) { @@ -152,20 +151,18 @@ func (sh *Schema) skipDeprecatedColumns(schema model.Schema) { logfield.ColumnName, columnName, ) delete(schema[tableName], columnName) - continue } } } } -func (sh *Schema) prepareUploadSchema(ctx context.Context, stagingFiles []*model.StagingFile) error { +func (sh *Schema) prepareUploadSchema(ctx context.Context, stagingFiles []*model.StagingFile) (model.Schema, error) { consolidatedSchema, err := sh.consolidateStagingFilesSchemaUsingWarehouseSchema(ctx, stagingFiles) if err != nil { - return fmt.Errorf("consolidating staging files schema: %w", err) + return nil, fmt.Errorf("consolidating staging files schema: %w", err) } - sh.uploadSchema = consolidatedSchema - return nil + return consolidatedSchema, nil } // consolidateStagingFilesSchemaUsingWarehouseSchema consolidates staging files schema with warehouse schema @@ -248,10 +245,10 @@ func consolidateWarehouseSchema(consolidatedSchema, warehouseSchema model.Schema // Removes the user_id column from the users table func overrideUsersWithIdentifiesSchema(consolidatedSchema model.Schema, warehouseType string, warehouseSchema model.Schema) model.Schema { var ( - usersTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.UsersTable) - identifiesTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentifiesTable) - userIDColumn = warehouseutils.ToProviderCase(warehouseType, "user_id") - IDColumn = warehouseutils.ToProviderCase(warehouseType, "id") + usersTable = whutils.ToProviderCase(warehouseType, whutils.UsersTable) + identifiesTable = whutils.ToProviderCase(warehouseType, whutils.IdentifiesTable) + userIDColumn = whutils.ToProviderCase(warehouseType, "user_id") + IDColumn = whutils.ToProviderCase(warehouseType, "id") ) if _, ok := consolidatedSchema[usersTable]; !ok { return consolidatedSchema @@ -279,15 +276,15 @@ func overrideUsersWithIdentifiesSchema(consolidatedSchema model.Schema, warehous func enhanceDiscardsSchema(consolidatedSchema model.Schema, warehouseType string) model.Schema { discards := model.TableSchema{} - for colName, colType := range warehouseutils.DiscardsSchema { - discards[warehouseutils.ToProviderCase(warehouseType, colName)] = colType + for colName, colType := range whutils.DiscardsSchema { + discards[whutils.ToProviderCase(warehouseType, colName)] = colType } - if warehouseType == warehouseutils.BQ { - discards[warehouseutils.ToProviderCase(warehouseType, "loaded_at")] = "datetime" + if warehouseType == whutils.BQ { + discards[whutils.ToProviderCase(warehouseType, "loaded_at")] = "datetime" } - consolidatedSchema[warehouseutils.ToProviderCase(warehouseType, warehouseutils.DiscardsTable)] = discards + consolidatedSchema[whutils.ToProviderCase(warehouseType, whutils.DiscardsTable)] = discards return consolidatedSchema } @@ -297,28 +294,28 @@ func enhanceSchemaWithIDResolution(consolidatedSchema model.Schema, isIDResoluti return consolidatedSchema } var ( - mergeRulesTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentityMergeRulesTable) - mappingsTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentityMappingsTable) + mergeRulesTable = whutils.ToProviderCase(warehouseType, whutils.IdentityMergeRulesTable) + mappingsTable = whutils.ToProviderCase(warehouseType, whutils.IdentityMappingsTable) ) if _, ok := consolidatedSchema[mergeRulesTable]; ok { consolidatedSchema[mergeRulesTable] = model.TableSchema{ - warehouseutils.ToProviderCase(warehouseType, "merge_property_1_type"): "string", - warehouseutils.ToProviderCase(warehouseType, "merge_property_1_value"): "string", - warehouseutils.ToProviderCase(warehouseType, "merge_property_2_type"): "string", - warehouseutils.ToProviderCase(warehouseType, "merge_property_2_value"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_1_type"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_1_value"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_2_type"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_2_value"): "string", } consolidatedSchema[mappingsTable] = model.TableSchema{ - warehouseutils.ToProviderCase(warehouseType, "merge_property_type"): "string", - warehouseutils.ToProviderCase(warehouseType, "merge_property_value"): "string", - warehouseutils.ToProviderCase(warehouseType, "rudder_id"): "string", - warehouseutils.ToProviderCase(warehouseType, "updated_at"): "datetime", + whutils.ToProviderCase(warehouseType, "merge_property_type"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_value"): "string", + whutils.ToProviderCase(warehouseType, "rudder_id"): "string", + whutils.ToProviderCase(warehouseType, "updated_at"): "datetime", } } return consolidatedSchema } func (sh *Schema) isIDResolutionEnabled() bool { - return sh.enableIDResolution && slices.Contains(warehouseutils.IdentityEnabledWarehouses, sh.warehouse.Type) + return sh.enableIDResolution && slices.Contains(whutils.IdentityEnabledWarehouses, sh.warehouse.Type) } // hasSchemaChanged compares the localSchema with the schemaInWarehouse @@ -350,9 +347,9 @@ func (sh *Schema) hasSchemaChanged() bool { return false } -// generateTableSchemaDiff returns the diff between the warehouse schema and the upload schema -func (sh *Schema) generateTableSchemaDiff(tableName string) warehouseutils.TableSchemaDiff { - diff := warehouseutils.TableSchemaDiff{ +// TableSchemaDiff returns the diff between the warehouse schema and the upload schema +func (sh *Schema) TableSchemaDiff(tableName string, tableSchema model.TableSchema) whutils.TableSchemaDiff { + diff := whutils.TableSchemaDiff{ ColumnMap: make(model.TableSchema), UpdatedSchema: make(model.TableSchema), AlteredColumnMap: make(model.TableSchema), @@ -360,13 +357,13 @@ func (sh *Schema) generateTableSchemaDiff(tableName string) warehouseutils.Table currentTableSchema, ok := sh.schemaInWarehouse[tableName] if !ok { - if _, ok := sh.uploadSchema[tableName]; !ok { + if len(tableSchema) == 0 { return diff } diff.Exists = true diff.TableToBeCreated = true - diff.ColumnMap = sh.uploadSchema[tableName] - diff.UpdatedSchema = sh.uploadSchema[tableName] + diff.ColumnMap = tableSchema + diff.UpdatedSchema = tableSchema return diff } @@ -375,7 +372,7 @@ func (sh *Schema) generateTableSchemaDiff(tableName string) warehouseutils.Table } diff.ColumnMap = make(model.TableSchema) - for columnName, columnType := range sh.uploadSchema[tableName] { + for columnName, columnType := range tableSchema { if _, ok := currentTableSchema[columnName]; !ok { diff.ColumnMap[columnName] = columnType diff.UpdatedSchema[columnName] = columnType diff --git a/warehouse/schema_test.go b/warehouse/schema_test.go index 91d4d4a412..ad3b44978a 100644 --- a/warehouse/schema_test.go +++ b/warehouse/schema_test.go @@ -585,17 +585,16 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { func TestSchema_GetUploadSchemaDiff(t *testing.T) { testCases := []struct { - name string - tableName string - currentSchema model.Schema - uploadSchema model.Schema - expected warehouseutils.TableSchemaDiff + name string + tableName string + currentSchema model.Schema + uploadTableSchema model.TableSchema + expected warehouseutils.TableSchemaDiff }{ { name: "empty current and upload schema", tableName: "test-table", currentSchema: model.Schema{}, - uploadSchema: model.Schema{}, expected: warehouseutils.TableSchemaDiff{ ColumnMap: model.TableSchema{}, UpdatedSchema: model.TableSchema{}, @@ -606,10 +605,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { name: "empty current schema", tableName: "test-table", currentSchema: model.Schema{}, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "test-value", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "test-value", }, expected: warehouseutils.TableSchemaDiff{ Exists: true, @@ -631,10 +628,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { "test-column": "test-value-1", }, }, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "test-value-2", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "test-value-2", }, expected: warehouseutils.TableSchemaDiff{ Exists: false, @@ -655,10 +650,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { "test-column-2": "test-value-2", }, }, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "test-value-2", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "test-value-2", }, expected: warehouseutils.TableSchemaDiff{ Exists: true, @@ -683,10 +676,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { "test-column-2": "test-value-2", }, }, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "text", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "text", }, expected: warehouseutils.TableSchemaDiff{ Exists: true, @@ -710,9 +701,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { sch := Schema{ schemaInWarehouse: tc.currentSchema, - uploadSchema: tc.uploadSchema, } - diff := sch.generateTableSchemaDiff(tc.tableName) + diff := sch.TableSchemaDiff(tc.tableName, tc.uploadTableSchema) require.EqualValues(t, diff, tc.expected) }) } @@ -1901,13 +1891,13 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { stagingFilesSchemaPaginationSize: 2, } - err := sh.prepareUploadSchema(ctx, stagingFiles) + uploadSchema, err := sh.prepareUploadSchema(ctx, stagingFiles) if tc.wantError != nil { require.EqualError(t, err, tc.wantError.Error()) } else { require.NoError(t, err) } - require.Equal(t, tc.expectedSchema, sh.uploadSchema) + require.Equal(t, tc.expectedSchema, uploadSchema) }) } } diff --git a/warehouse/upload.go b/warehouse/upload.go index 724a33f0dd..ac25b13bc3 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -12,25 +12,22 @@ import ( "sync/atomic" "time" - "github.com/rudderlabs/rudder-server/services/notifier" - - "github.com/rudderlabs/rudder-server/warehouse/encoding" - - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/app" - "github.com/cenkalti/backoff/v4" "github.com/samber/lo" "golang.org/x/exp/slices" "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/app" "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/services/alerta" + "github.com/rudderlabs/rudder-server/services/notifier" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/timeutil" "github.com/rudderlabs/rudder-server/utils/types" + "github.com/rudderlabs/rudder-server/warehouse/encoding" "github.com/rudderlabs/rudder-server/warehouse/identity" integrationsconfig "github.com/rudderlabs/rudder-server/warehouse/integrations/config" schemarepository "github.com/rudderlabs/rudder-server/warehouse/integrations/datalake/schema-repository" @@ -43,7 +40,7 @@ import ( "github.com/rudderlabs/rudder-server/warehouse/internal/service" "github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader" "github.com/rudderlabs/rudder-server/warehouse/logfield" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + whutils "github.com/rudderlabs/rudder-server/warehouse/utils" "github.com/rudderlabs/rudder-server/warehouse/validations" ) @@ -104,7 +101,6 @@ type UploadJob struct { stagingFiles []*model.StagingFile stagingFileIDs []int64 schemaLock sync.Mutex - uploadLock sync.Mutex alertSender alerta.AlertSender now func() time.Time @@ -170,8 +166,8 @@ const ( ) var ( - alwaysMarkExported = []string{warehouseutils.DiscardsTable} - warehousesToAlwaysRegenerateAllLoadFilesOnResume = []string{warehouseutils.SNOWFLAKE, warehouseutils.BQ} + alwaysMarkExported = []string{whutils.DiscardsTable} + warehousesToAlwaysRegenerateAllLoadFilesOnResume = []string{whutils.SNOWFLAKE, whutils.BQ} mergeSourceCategoryMap = map[string]struct{}{ "cloud": {}, "singer-protocol": {}, @@ -183,7 +179,7 @@ func init() { } func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJob, whManager manager.Manager) *UploadJob { - ujCtx := warehouseutils.CtxWithUploadID(ctx, dto.Upload.ID) + ujCtx := whutils.CtxWithUploadID(ctx, dto.Upload.ID) uj := &UploadJob{ ctx: ujCtx, @@ -202,6 +198,7 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo f.dbHandle, dto.Warehouse, config.Default, + f.logger.Child("warehouse").Child("schema"), ), upload: dto.Upload, @@ -228,8 +225,8 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo uj.config.reportingEnabled = f.conf.GetBool("Reporting.enabled", types.DefaultReportingEnabled) uj.config.generateTableLoadCountMetrics = f.conf.GetBool("Warehouse.generateTableLoadCountMetrics", true) uj.config.disableGenerateTableLoadCountMetricsWorkspaceIDs = f.conf.GetStringSlice("Warehouse.disableGenerateTableLoadCountMetricsWorkspaceIDs", nil) - uj.config.columnsBatchSize = f.conf.GetInt(fmt.Sprintf("Warehouse.%s.columnsBatchSize", warehouseutils.WHDestNameMap[uj.upload.DestinationType]), 100) - uj.config.maxParallelLoadsWorkspaceIDs = f.conf.GetStringMap(fmt.Sprintf("Warehouse.%s.maxParallelLoadsWorkspaceIDs", warehouseutils.WHDestNameMap[uj.upload.DestinationType]), nil) + uj.config.columnsBatchSize = f.conf.GetInt(fmt.Sprintf("Warehouse.%s.columnsBatchSize", whutils.WHDestNameMap[uj.upload.DestinationType]), 100) + uj.config.maxParallelLoadsWorkspaceIDs = f.conf.GetStringMap(fmt.Sprintf("Warehouse.%s.maxParallelLoadsWorkspaceIDs", whutils.WHDestNameMap[uj.upload.DestinationType]), nil) if f.conf.IsSet("Warehouse.tableCountQueryTimeout") { uj.config.tableCountQueryTimeout = f.conf.GetDuration("Warehouse.tableCountQueryTimeout", 30, time.Second) @@ -274,19 +271,19 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo } func (job *UploadJob) identifiesTableName() string { - return warehouseutils.ToProviderCase(job.warehouse.Type, warehouseutils.IdentifiesTable) + return whutils.ToProviderCase(job.warehouse.Type, whutils.IdentifiesTable) } func (job *UploadJob) usersTableName() string { - return warehouseutils.ToProviderCase(job.warehouse.Type, warehouseutils.UsersTable) + return whutils.ToProviderCase(job.warehouse.Type, whutils.UsersTable) } func (job *UploadJob) identityMergeRulesTableName() string { - return warehouseutils.ToProviderCase(job.warehouse.Type, warehouseutils.IdentityMergeRulesTable) + return whutils.ToProviderCase(job.warehouse.Type, whutils.IdentityMergeRulesTable) } func (job *UploadJob) identityMappingsTableName() string { - return warehouseutils.ToProviderCase(job.warehouse.Type, warehouseutils.IdentityMappingsTable) + return whutils.ToProviderCase(job.warehouse.Type, whutils.IdentityMappingsTable) } func (job *UploadJob) trackLongRunningUpload() chan struct{} { @@ -312,17 +309,25 @@ func (job *UploadJob) trackLongRunningUpload() chan struct{} { } func (job *UploadJob) generateUploadSchema() error { - if err := job.schemaHandle.prepareUploadSchema( - job.ctx, - job.stagingFiles, - ); err != nil { + uploadSchema, err := job.schemaHandle.prepareUploadSchema(job.ctx, job.stagingFiles) + if err != nil { return fmt.Errorf("consolidate staging files schema using warehouse schema: %w", err) } - if err := job.setUploadSchema(job.schemaHandle.uploadSchema); err != nil { + marshalledSchema, err := json.Marshal(uploadSchema) + if err != nil { + panic(err) + } + + err = job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{ + {Column: UploadSchemaField, Value: marshalledSchema}, + }}) + if err != nil { return fmt.Errorf("set upload schema: %w", err) } + job.upload.UploadSchema = uploadSchema + return nil } @@ -333,9 +338,9 @@ func (job *UploadJob) initTableUploads() error { for t := range schemaForUpload { tables = append(tables, t) // also track upload to rudder_identity_mappings if the upload has records for rudder_identity_merge_rules - if slices.Contains(warehouseutils.IdentityEnabledWarehouses, destType) && t == warehouseutils.ToProviderCase(destType, warehouseutils.IdentityMergeRulesTable) { - if _, ok := schemaForUpload[warehouseutils.ToProviderCase(destType, warehouseutils.IdentityMappingsTable)]; !ok { - tables = append(tables, warehouseutils.ToProviderCase(destType, warehouseutils.IdentityMappingsTable)) + if slices.Contains(whutils.IdentityEnabledWarehouses, destType) && t == whutils.ToProviderCase(destType, whutils.IdentityMergeRulesTable) { + if _, ok := schemaForUpload[whutils.ToProviderCase(destType, whutils.IdentityMappingsTable)]; !ok { + tables = append(tables, whutils.ToProviderCase(destType, whutils.IdentityMappingsTable)) } } } @@ -400,9 +405,9 @@ func (job *UploadJob) getTotalRowsInLoadFiles(ctx context.Context) int64 { row_number = 1 AND table_name != '%[3]s'; `, - warehouseutils.WarehouseLoadFilesTable, + whutils.WarehouseLoadFilesTable, misc.IntArrayToString(job.stagingFileIDs, ","), - warehouseutils.ToProviderCase(job.warehouse.Type, warehouseutils.DiscardsTable), + whutils.ToProviderCase(job.warehouse.Type, whutils.DiscardsTable), ) if err := job.dbHandle.QueryRowContext(ctx, sqlStatement).Scan(&total); err != nil { job.logger.Errorf(`Error in getTotalRowsInLoadFiles: %v`, err) @@ -433,8 +438,6 @@ func (job *UploadJob) run() (err error) { ch <- struct{}{} }() - job.uploadLock.Lock() - defer job.uploadLock.Unlock() _ = job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{{Column: UploadLastExecAtField, Value: job.now()}, {Column: UploadInProgress, Value: true}}}) if len(job.stagingFiles) == 0 { @@ -444,7 +447,7 @@ func (job *UploadJob) run() (err error) { } whManager := job.whManager - whManager.SetConnectionTimeout(warehouseutils.GetConnectionTimeout( + whManager.SetConnectionTimeout(whutils.GetConnectionTimeout( job.warehouse.Type, job.warehouse.Destination.ID, )) err = whManager.Setup(job.ctx, job.warehouse, job) @@ -468,8 +471,6 @@ func (job *UploadJob) run() (err error) { job.logger.Infof("[WH] Remote schema changed for Warehouse: %s", job.warehouse.Identifier) } - job.schemaHandle.uploadSchema = job.upload.UploadSchema - userTables := []string{job.identifiesTableName(), job.usersTableName()} identityTables := []string{job.identityMergeRulesTableName(), job.identityMappingsTableName()} @@ -591,6 +592,8 @@ func (job *UploadJob) run() (err error) { wg.Add(3) rruntime.GoForWarehouse(func() { + defer wg.Done() + var succeededUserTableCount int for _, userTable := range userTables { if _, ok := currentJobSucceededTables[userTable]; ok { @@ -598,19 +601,19 @@ func (job *UploadJob) run() (err error) { } } if succeededUserTableCount >= len(userTables) { - wg.Done() return } - err = job.exportUserTables(loadFilesTableMap) + err := job.exportUserTables(loadFilesTableMap) if err != nil { loadErrorLock.Lock() loadErrors = append(loadErrors, err) loadErrorLock.Unlock() } - wg.Done() }) rruntime.GoForWarehouse(func() { + defer wg.Done() + var succeededIdentityTableCount int for _, identityTable := range identityTables { if _, ok := currentJobSucceededTables[identityTable]; ok { @@ -618,30 +621,29 @@ func (job *UploadJob) run() (err error) { } } if succeededIdentityTableCount >= len(identityTables) { - wg.Done() return } - err = job.exportIdentities() + err := job.exportIdentities() if err != nil { loadErrorLock.Lock() loadErrors = append(loadErrors, err) loadErrorLock.Unlock() } - wg.Done() }) rruntime.GoForWarehouse(func() { + defer wg.Done() + specialTables := make([]string, 0, len(userTables)+len(identityTables)) specialTables = append(specialTables, userTables...) specialTables = append(specialTables, identityTables...) - err = job.exportRegularTables(specialTables, loadFilesTableMap) + err := job.exportRegularTables(specialTables, loadFilesTableMap) if err != nil { loadErrorLock.Lock() loadErrors = append(loadErrors, err) loadErrorLock.Unlock() } - wg.Done() }) wg.Wait() @@ -741,7 +743,7 @@ func (job *UploadJob) exportUserTables(loadFilesTableMap map[tableNameT]bool) (e func (job *UploadJob) exportIdentities() (err error) { // Load Identities if enabled uploadSchema := job.upload.UploadSchema - if warehouseutils.IDResolutionEnabled() && slices.Contains(warehouseutils.IdentityEnabledWarehouses, job.warehouse.Type) { + if whutils.IDResolutionEnabled() && slices.Contains(whutils.IdentityEnabledWarehouses, job.warehouse.Type) { if _, ok := uploadSchema[job.identityMergeRulesTableName()]; ok { defer job.stats.identityTablesLoadTime.RecordDuration()() @@ -838,7 +840,7 @@ func (job *UploadJob) resolveIdentities(populateHistoricIdentities bool) (err er return idr.Resolve(job.ctx) } -func (job *UploadJob) UpdateTableSchema(tName string, tableSchemaDiff warehouseutils.TableSchemaDiff) (err error) { +func (job *UploadJob) UpdateTableSchema(tName string, tableSchemaDiff whutils.TableSchemaDiff) (err error) { job.logger.Infof(`[WH]: Starting schema update for table %s in namespace %s of destination %s:%s`, tName, job.warehouse.Namespace, job.warehouse.Type, job.warehouse.Destination.ID) if tableSchemaDiff.TableToBeCreated { err = job.whManager.CreateTable(job.ctx, tName, tableSchemaDiff.ColumnMap) @@ -938,7 +940,7 @@ func (job *UploadJob) alterColumnsToWarehouse(ctx context.Context, tName string, func (job *UploadJob) addColumnsToWarehouse(ctx context.Context, tName string, columnsMap model.TableSchema) (err error) { job.logger.Infof(`[WH]: Adding columns for table %s in namespace %s of destination %s:%s`, tName, job.warehouse.Namespace, job.warehouse.Type, job.warehouse.Destination.ID) - var columnsToAdd []warehouseutils.ColumnInfo + var columnsToAdd []whutils.ColumnInfo for columnName, columnType := range columnsMap { // columns present in unrecognized schema should be skipped if unrecognizedSchema, ok := job.schemaHandle.unrecognizedSchemaInWarehouse[tName]; ok { @@ -947,7 +949,7 @@ func (job *UploadJob) addColumnsToWarehouse(ctx context.Context, tName string, c } } - columnsToAdd = append(columnsToAdd, warehouseutils.ColumnInfo{Name: columnName, Type: columnType}) + columnsToAdd = append(columnsToAdd, whutils.ColumnInfo{Name: columnName, Type: columnType}) } chunks := lo.Chunk(columnsToAdd, job.config.columnsBatchSize) @@ -988,7 +990,7 @@ func (job *UploadJob) loadAllTablesExcept(skipLoadForTables []string, loadFilesT wg.Add(len(uploadSchema)) var alteredSchemaInAtLeastOneTable atomic.Bool - loadChan := make(chan struct{}, parallelLoads) + concurrencyGuard := make(chan struct{}, parallelLoads) var ( err error @@ -1016,30 +1018,30 @@ func (job *UploadJob) loadAllTablesExcept(skipLoadForTables []string, loadFilesT } hasLoadFiles := loadFilesTableMap[tableNameT(tableName)] if !hasLoadFiles { - wg.Done() if slices.Contains(alwaysMarkExported, strings.ToLower(tableName)) { status := model.TableUploadExported _ = job.tableUploadsRepo.Set(job.ctx, job.upload.ID, tableName, repo.TableUploadSetOptions{ Status: &status, }) } + wg.Done() continue } - tName := tableName - loadChan <- struct{}{} + tableName := tableName + concurrencyGuard <- struct{}{} rruntime.GoForWarehouse(func() { - alteredSchema, err := job.loadTable(tName) + alteredSchema, err := job.loadTable(tableName) if alteredSchema { alteredSchemaInAtLeastOneTable.Store(true) } - if err != nil { loadErrorLock.Lock() loadErrors = append(loadErrors, err) loadErrorLock.Unlock() } + + <-concurrencyGuard wg.Done() - <-loadChan }) } wg.Wait() @@ -1053,7 +1055,7 @@ func (job *UploadJob) loadAllTablesExcept(skipLoadForTables []string, loadFilesT } func (job *UploadJob) updateSchema(tName string) (alteredSchema bool, err error) { - tableSchemaDiff := job.schemaHandle.generateTableSchemaDiff(tName) + tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tName, job.GetTableSchemaInUpload(tName)) if tableSchemaDiff.Exists { err = job.UpdateTableSchema(tName, tableSchemaDiff) if err != nil { @@ -1175,9 +1177,9 @@ func (job *UploadJob) loadTable(tName string) (bool, error) { } // TODO : Perform the comparison here in the codebase - job.guageStat(`pre_load_table_rows`, warehouseutils.Tag{Name: "tableName", Value: strings.ToLower(tName)}).Gauge(int(totalBeforeLoad)) - job.guageStat(`post_load_table_rows_estimate`, warehouseutils.Tag{Name: "tableName", Value: strings.ToLower(tName)}).Gauge(int(totalBeforeLoad + tableUpload.TotalEvents)) - job.guageStat(`post_load_table_rows`, warehouseutils.Tag{Name: "tableName", Value: strings.ToLower(tName)}).Gauge(int(totalAfterLoad)) + job.guageStat(`pre_load_table_rows`, whutils.Tag{Name: "tableName", Value: strings.ToLower(tName)}).Gauge(int(totalBeforeLoad)) + job.guageStat(`post_load_table_rows_estimate`, whutils.Tag{Name: "tableName", Value: strings.ToLower(tName)}).Gauge(int(totalBeforeLoad + tableUpload.TotalEvents)) + job.guageStat(`post_load_table_rows`, whutils.Tag{Name: "tableName", Value: strings.ToLower(tName)}).Gauge(int(totalAfterLoad)) }() status = model.TableUploadExported @@ -1203,7 +1205,7 @@ func (job *UploadJob) columnCountStat(tableName string) { ) switch job.warehouse.Type { - case warehouseutils.S3Datalake, warehouseutils.GCSDatalake, warehouseutils.AzureDatalake: + case whutils.S3Datalake, whutils.GCSDatalake, whutils.AzureDatalake: return } @@ -1213,7 +1215,7 @@ func (job *UploadJob) columnCountStat(tableName string) { return } - tags := []warehouseutils.Tag{ + tags := []whutils.Tag{ {Name: "tableName", Value: strings.ToLower(tableName)}, } currentColumnsCount := len(job.schemaHandle.schemaInWarehouse[tableName]) @@ -1346,7 +1348,7 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE errorMap[tableName] = nil - tableSchemaDiff := job.schemaHandle.generateTableSchemaDiff(tableName) + tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tableName, job.GetTableSchemaInUpload(tableName)) if tableSchemaDiff.Exists { err := job.UpdateTableSchema(tableName, tableSchemaDiff) if err != nil { @@ -1465,14 +1467,14 @@ func (job *UploadJob) getUploadFirstAttemptTime() (timing time.Time) { WHERE id = %d; `, - warehouseutils.WarehouseUploadsTable, + whutils.WarehouseUploadsTable, job.upload.ID, ) err := job.dbHandle.QueryRowContext(job.ctx, sqlStatement).Scan(&firstTiming) if err != nil { return } - _, timing = warehouseutils.TimingFromJSONString(firstTiming) + _, timing = whutils.TimingFromJSONString(firstTiming) return timing } @@ -1522,16 +1524,6 @@ func (job *UploadJob) setUploadStatus(statusOpts UploadStatusOpts) (err error) { return job.setUploadColumns(uploadColumnOpts) } -// SetUploadSchema -func (job *UploadJob) setUploadSchema(consolidatedSchema model.Schema) error { - marshalledSchema, err := json.Marshal(consolidatedSchema) - if err != nil { - panic(err) - } - job.upload.UploadSchema = consolidatedSchema - return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{{Column: UploadSchemaField, Value: marshalledSchema}}}) -} - // Set LoadFileIDs func (job *UploadJob) setLoadFileIDs(startLoadFileID, endLoadFileID int64) error { if startLoadFileID > endLoadFileID { @@ -1573,9 +1565,8 @@ func (job *UploadJob) setUploadColumns(opts UploadColumnsOpts) error { SET %s WHERE - id = $1; -`, - warehouseutils.WarehouseUploadsTable, + id = $1;`, + whutils.WarehouseUploadsTable, columns, ) @@ -1696,7 +1687,7 @@ func (job *UploadJob) setUploadError(statusError error, state string) (string, e } serializedErr, _ := json.Marshal(&uploadErrors) - serializedErr = warehouseutils.SanitizeJSON(serializedErr) + serializedErr = whutils.SanitizeJSON(serializedErr) uploadColumns := []UploadColumn{ {Column: "status", Value: state}, @@ -1714,7 +1705,7 @@ func (job *UploadJob) setUploadError(statusError error, state string) (string, e } inputCount, _ := repo.NewStagingFiles(job.dbHandle).TotalEventsForUpload(job.ctx, upload) outputCount, _ := job.tableUploadsRepo.TotalExportedEvents(job.ctx, job.upload.ID, []string{ - warehouseutils.ToProviderCase(job.warehouse.Type, warehouseutils.DiscardsTable), + whutils.ToProviderCase(job.warehouse.Type, whutils.DiscardsTable), }) failCount := inputCount - outputCount @@ -1781,11 +1772,11 @@ func (job *UploadJob) setUploadError(statusError error, state string) (string, e if state == model.Aborted { // base tag to be sent as stat - tags := []warehouseutils.Tag{errorTags} + tags := []whutils.Tag{errorTags} valid, err := job.validateDestinationCredentials() if err == nil { - tags = append(tags, warehouseutils.Tag{Name: "destination_creds_valid", Value: strconv.FormatBool(valid)}) + tags = append(tags, whutils.Tag{Name: "destination_creds_valid", Value: strconv.FormatBool(valid)}) destCredentialsValidations = &valid } @@ -1837,7 +1828,7 @@ func (job *UploadJob) getLoadFilesTableMap() (loadFilesMap map[tableNameT]bool, AND id <= $4 ); `, - warehouseutils.WarehouseLoadFilesTable, + whutils.WarehouseLoadFilesTable, ) /**/ sqlStatementArgs := []interface{}{ sourceID, @@ -1873,8 +1864,8 @@ func (job *UploadJob) getLoadFilesTableMap() (loadFilesMap map[tableNameT]bool, func (job *UploadJob) areIdentityTablesLoadFilesGenerated(ctx context.Context) (bool, error) { var ( - mergeRulesTable = warehouseutils.ToProviderCase(job.warehouse.Type, warehouseutils.IdentityMergeRulesTable) - mappingsTable = warehouseutils.ToProviderCase(job.warehouse.Type, warehouseutils.IdentityMappingsTable) + mergeRulesTable = whutils.ToProviderCase(job.warehouse.Type, whutils.IdentityMergeRulesTable) + mappingsTable = whutils.ToProviderCase(job.warehouse.Type, whutils.IdentityMappingsTable) tu model.TableUpload err error ) @@ -1894,7 +1885,7 @@ func (job *UploadJob) areIdentityTablesLoadFilesGenerated(ctx context.Context) ( return true, nil } -func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options warehouseutils.GetLoadFilesOptions) (loadFiles []warehouseutils.LoadFile) { +func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options whutils.GetLoadFilesOptions) (loadFiles []whutils.LoadFile) { var tableFilterSQL string if options.Table != "" { tableFilterSQL = fmt.Sprintf(` AND table_name='%s'`, options.Table) @@ -1930,7 +1921,7 @@ func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options warehous row_number = 1 %[4]s; `, - warehouseutils.WarehouseLoadFilesTable, + whutils.WarehouseLoadFilesTable, misc.IntArrayToString(job.stagingFileIDs, ","), tableFilterSQL, limitSQL, @@ -1950,7 +1941,7 @@ func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options warehous if err != nil { panic(fmt.Errorf("failed to scan result from query: %s\nwith Error : %w", sqlStatement, err)) } - loadFiles = append(loadFiles, warehouseutils.LoadFile{ + loadFiles = append(loadFiles, whutils.LoadFile{ Location: location, Metadata: metadata, }) @@ -1962,7 +1953,7 @@ func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options warehous } func (job *UploadJob) GetSampleLoadFileLocation(ctx context.Context, tableName string) (location string, err error) { - locations := job.GetLoadFilesMetadata(ctx, warehouseutils.GetLoadFilesOptions{Table: tableName, Limit: 1}) + locations := job.GetLoadFilesMetadata(ctx, whutils.GetLoadFilesOptions{Table: tableName, Limit: 1}) if len(locations) == 0 { return "", fmt.Errorf(`no load file found for table:%s`, tableName) } @@ -1981,20 +1972,20 @@ func (job *UploadJob) GetTableSchemaInWarehouse(tableName string) model.TableSch } func (job *UploadJob) GetTableSchemaInUpload(tableName string) model.TableSchema { - return job.schemaHandle.uploadSchema[tableName] + return job.upload.UploadSchema[tableName] } -func (job *UploadJob) GetSingleLoadFile(ctx context.Context, tableName string) (warehouseutils.LoadFile, error) { +func (job *UploadJob) GetSingleLoadFile(ctx context.Context, tableName string) (whutils.LoadFile, error) { var ( tableUpload model.TableUpload err error ) if tableUpload, err = job.tableUploadsRepo.GetByUploadIDAndTableName(ctx, job.upload.ID, tableName); err != nil { - return warehouseutils.LoadFile{}, fmt.Errorf("get single load file: %w", err) + return whutils.LoadFile{}, fmt.Errorf("get single load file: %w", err) } - return warehouseutils.LoadFile{Location: tableUpload.Location}, err + return whutils.LoadFile{Location: tableUpload.Location}, err } func (job *UploadJob) ShouldOnDedupUseNewRecord() bool { @@ -2127,7 +2118,7 @@ func (job *UploadJob) UpdateLocalSchema(ctx context.Context, schema model.Schema } func (job *UploadJob) RefreshPartitions(loadFileStartID, loadFileEndID int64) error { - if !slices.Contains(warehouseutils.TimeWindowDestinations, job.upload.DestinationType) { + if !slices.Contains(whutils.TimeWindowDestinations, job.upload.DestinationType) { return nil } @@ -2142,7 +2133,7 @@ func (job *UploadJob) RefreshPartitions(loadFileStartID, loadFileEndID int64) er // Refresh partitions if exists for tableName := range job.upload.UploadSchema { - loadFiles := job.GetLoadFilesMetadata(job.ctx, warehouseutils.GetLoadFilesOptions{ + loadFiles := job.GetLoadFilesMetadata(job.ctx, whutils.GetLoadFilesOptions{ Table: tableName, StartID: loadFileStartID, EndID: loadFileEndID,