diff --git a/warehouse/identities.go b/warehouse/identities.go index a2737205a7..d22b2e75a6 100644 --- a/warehouse/identities.go +++ b/warehouse/identities.go @@ -427,6 +427,7 @@ func (wh *HandleT) populateHistoricIdentities(warehouse warehouseutils.Warehouse dbHandle: wh.dbHandle, pgNotifier: &wh.notifier, destinationValidator: validations.NewDestinationValidator(), + stats: stats.Default, } tableUploadsCreated := areTableUploadsCreated(job.upload.ID) diff --git a/warehouse/slave.go b/warehouse/slave.go index 9620368fe2..9790a9709b 100644 --- a/warehouse/slave.go +++ b/warehouse/slave.go @@ -14,6 +14,8 @@ import ( "strings" "time" + "github.com/rudderlabs/rudder-server/services/stats" + "github.com/rudderlabs/rudder-server/warehouse/model" "github.com/rudderlabs/rudder-server/config" @@ -48,6 +50,7 @@ type JobRunT struct { tableEventCountMap map[string]int stagingFileReader *gzip.Reader whIdentifier string + stats stats.Stats } func (jobRun *JobRunT) setStagingFileReader() (reader *gzip.Reader, endOfFile bool) { @@ -387,6 +390,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l jobRun := JobRunT{ job: job, whIdentifier: warehouseutils.GetWarehouseIdentifier(job.DestinationType, job.SourceID, job.DestinationID), + stats: stats.Default, } defer jobRun.counterStat("staging_files_processed", tag{name: "worker_id", value: strconv.Itoa(workerIndex)}).Count(1) @@ -455,7 +459,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l tableName := batchRouterEvent.Metadata.Table columnData := batchRouterEvent.Data - if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountThresholds[warehouseutils.S3_DATALAKE] { + if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountLimitMap[warehouseutils.S3_DATALAKE] { pkgLogger.Errorf("[WH]: Huge staging file columns : columns in upload schema: %v for StagingFileID: %v", len(sortedTableColumnMap[tableName]), job.StagingFileID) return nil, fmt.Errorf("staging file schema limit exceeded for stagingFileID: %d, actualCount: %d", job.StagingFileID, len(sortedTableColumnMap[tableName])) } diff --git a/warehouse/stats.go b/warehouse/stats.go index cd412a128f..6bd17837fe 100644 --- a/warehouse/stats.go +++ b/warehouse/stats.go @@ -44,7 +44,7 @@ func (job *UploadJobT) timerStat(name string, extraTags ...tag) stats.Measuremen for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.TimerType, tags) + return job.stats.NewTaggedStat(name, stats.TimerType, tags) } func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurement { @@ -59,7 +59,7 @@ func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurem for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.CountType, tags) + return job.stats.NewTaggedStat(name, stats.CountType, tags) } func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measurement { @@ -75,7 +75,7 @@ func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measuremen for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.GaugeType, tags) + return job.stats.NewTaggedStat(name, stats.GaugeType, tags) } func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measurement { @@ -90,7 +90,7 @@ func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measuremen for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.TimerType, tags) + return jobRun.stats.NewTaggedStat(name, stats.TimerType, tags) } func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurement { @@ -105,7 +105,7 @@ func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurem for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.CountType, tags) + return jobRun.stats.NewTaggedStat(name, stats.CountType, tags) } func (job *UploadJobT) generateUploadSuccessMetrics() { diff --git a/warehouse/stats_test.go b/warehouse/stats_test.go index f7bf941bb4..8eec36eb9d 100644 --- a/warehouse/stats_test.go +++ b/warehouse/stats_test.go @@ -11,8 +11,6 @@ import ( . "github.com/onsi/gomega" "github.com/ory/dockertest/v3" - backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" - "github.com/rudderlabs/rudder-server/services/stats" "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/destination" "github.com/rudderlabs/rudder-server/utils/logger" @@ -22,17 +20,11 @@ import ( var _ = Describe("Stats", Ordered, func() { var ( - sourceID = "test-sourceID" - destinationID = "test-destinationID" - destinationType = "test-desinationType" - destinationName = "test-destinationName" - sourceName = "test-sourceName" - statName = "test-statName" - g = GinkgoT() - pgResource *destination.PostgresResource - err error - uploadID = int64(1) - cleanup = &testhelper.Cleanup{} + g = GinkgoT() + pgResource *destination.PostgresResource + err error + uploadID = int64(1) + cleanup = &testhelper.Cleanup{} ) BeforeAll(func() { @@ -59,39 +51,6 @@ var _ = Describe("Stats", Ordered, func() { cleanup.Run() }) - BeforeEach(func() { - defaultStats := stats.Default - - DeferCleanup(func() { - stats.Default = defaultStats - }) - }) - - Describe("Jobs stats", func() { - BeforeEach(func() { - mockStats, mockMeasurement := getMockStats(g) - mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement) - mockMeasurement.EXPECT().Count(gomock.Any()).AnyTimes() - - stats.Default = mockStats - }) - - It("Upload status stat", func() { - getUploadStatusStat(statName, warehouseutils.Warehouse{ - WorkspaceID: "workspaceID", - Source: backendconfig.SourceT{ID: sourceID, Name: sourceName}, - Destination: backendconfig.DestinationT{ID: destinationID, Name: destinationName}, - Namespace: "", - Type: destinationType, - Identifier: "", - }) - }) - - It("Persist ssl file error stat", func() { - persistSSLFileErrorStat("workspaceID", destinationType, destinationName, destinationID, sourceName, sourceID, "") - }) - }) - Describe("Generate upload success/aborted metrics", func() { var job *UploadJobT @@ -101,8 +60,6 @@ var _ = Describe("Stats", Ordered, func() { mockMeasurement.EXPECT().Count(4).Times(2) mockMeasurement.EXPECT().Count(1).Times(1) - stats.Default = mockStats - job = &UploadJobT{ upload: &Upload{ ID: uploadID, @@ -112,6 +69,7 @@ var _ = Describe("Stats", Ordered, func() { warehouse: warehouseutils.Warehouse{ Type: "POSTGRES", }, + stats: mockStats, } }) @@ -130,8 +88,6 @@ var _ = Describe("Stats", Ordered, func() { mockMeasurement.EXPECT().Count(4).Times(2) mockMeasurement.EXPECT().Since(gomock.Any()).Times(1) - stats.Default = mockStats - job := &UploadJobT{ upload: &Upload{ WorkspaceID: "workspaceID", @@ -142,6 +98,7 @@ var _ = Describe("Stats", Ordered, func() { warehouse: warehouseutils.Warehouse{ Type: "POSTGRES", }, + stats: mockStats, } job.recordTableLoad("tracks", 4) }) @@ -151,8 +108,6 @@ var _ = Describe("Stats", Ordered, func() { mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement) mockMeasurement.EXPECT().SendTiming(gomock.Any()).Times(1) - stats.Default = mockStats - job := &UploadJobT{ upload: &Upload{ ID: uploadID, @@ -163,6 +118,7 @@ var _ = Describe("Stats", Ordered, func() { Type: "POSTGRES", }, dbHandle: pgResource.DB, + stats: mockStats, } err = job.recordLoadFileGenerationTimeStat(1, 4) diff --git a/warehouse/upload.go b/warehouse/upload.go index 224dee90bb..d155b0aba1 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -119,6 +119,7 @@ type UploadJobT struct { hasAllTablesSkipped bool tableUploadStatuses []*TableUploadStatusT destinationValidator validations.DestinationValidator + stats stats.Stats } type UploadColumnT struct { @@ -145,8 +146,8 @@ var ( ) var ( - maxParallelLoads map[string]int - columnCountThresholds map[string]int + maxParallelLoads map[string]int + columnCountLimitMap map[string]int ) func Init() { @@ -167,15 +168,15 @@ func setMaxParallelLoads() { warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.maxParallelLoads", 3), warehouseutils.DELTALAKE: config.GetInt("Warehouse.deltalake.maxParallelLoads", 3), } - columnCountThresholds = map[string]int{ - warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountThreshold", 800), - warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountThreshold", 8000), - warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountThreshold", 800), - warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountThreshold", 800), - warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountThreshold", 1200), - warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountThreshold", 1200), - warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountThreshold", 1600), - warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountThreshold", 10000), + columnCountLimitMap = map[string]int{ + warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountLimit", 1024), + warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountLimit", 10000), + warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountLimit", 1000), + warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountLimit", 1024), + warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountLimit", 1600), + warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountLimit", 1600), + warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountLimit", 5000), + warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountLimit", 10000), } } @@ -204,7 +205,7 @@ func (job *UploadJobT) trackLongRunningUpload() chan struct{} { case <-time.After(longRunningUploadStatThresholdInMin): pkgLogger.Infof("[WH]: Registering stat for long running upload: %d, dest: %s", job.upload.ID, job.warehouse.Identifier) - stats.Default.NewTaggedStat( + job.stats.NewTaggedStat( "warehouse.long_running_upload", stats.CountType, stats.Tags{ @@ -1067,10 +1068,25 @@ func (job *UploadJobT) loadTable(tName string) (alteredSchema bool, err error) { job.recordTableLoad(tName, numEvents) } - if columnThreshold, ok := columnCountThresholds[job.warehouse.Type]; ok { - columnCount := len(job.schemaHandle.schemaInWarehouse[tName]) - if columnCount > columnThreshold { - job.counterStat(`warehouse_load_table_column_count`, tag{name: "tableName", value: strings.ToLower(tName)}).Count(columnCount) + job.columnCountStat(tName) + + return +} + +func (job *UploadJobT) columnCountStat(tableName string) { + if columnCountLimit, ok := columnCountLimitMap[job.warehouse.Type]; ok { + currentColumnsCount := len(job.schemaHandle.schemaInWarehouse[tableName]) + if currentColumnsCount > int(float64(columnCountLimit)*columnCountLimitThreshold) { + tags := []tag{ + { + name: "tableName", value: strings.ToLower(tableName), + }, + { + name: "columnCountLimit", value: strconv.Itoa(columnCountLimit), + }, + } + + job.counterStat(`warehouse_load_table_column_count`, tags...).Count(currentColumnsCount) } } return diff --git a/warehouse/upload_test.go b/warehouse/upload_test.go index 858168e060..685e0e4336 100644 --- a/warehouse/upload_test.go +++ b/warehouse/upload_test.go @@ -8,7 +8,7 @@ import ( "os" "testing" - "github.com/rudderlabs/rudder-server/services/stats" + mock_stats "github.com/rudderlabs/rudder-server/mocks/services/stats" "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" @@ -76,6 +76,85 @@ func TestExtractUploadErrorsByState(t *testing.T) { } } +func TestColumnCountStat(t *testing.T) { + Init() + Init4() + + inputs := []struct { + name string + columnCountLimit int + destinationType string + statExpected bool + }{ + { + name: "less than threshold", + destinationType: "test-destination", + columnCountLimit: 1, + statExpected: true, + }, + { + name: "greater than threshold", + destinationType: "test-destination", + columnCountLimit: 10, + }, + { + name: "unknown destination", + destinationType: "unknwon-destination", + }, + } + + ctrl := gomock.NewController(t) + mockStats := mock_stats.NewMockStats(ctrl) + mockMeasurement := mock_stats.NewMockMeasurement(ctrl) + + for _, tc := range inputs { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + columnCountLimitMap = map[string]int{ + "test-destination": tc.columnCountLimit, + } + + if tc.statExpected { + mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement) + mockMeasurement.EXPECT().Count(3).Times(1) + } else { + mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(0).Return(mockMeasurement) + } + + j := UploadJobT{ + upload: &Upload{ + WorkspaceID: "test-workspaceID", + DestinationID: "test-desinationID", + SourceID: "test-sourceID", + }, + warehouse: warehouseutils.Warehouse{ + Type: tc.destinationType, + Destination: backendconfig.DestinationT{ + ID: "test-desinationID", + Name: "test-desinationName", + }, + Source: backendconfig.SourceT{ + ID: "test-sourceID", + Name: "test-sourceName", + }, + }, + stats: mockStats, + schemaHandle: &SchemaHandleT{ + schemaInWarehouse: warehouseutils.SchemaT{ + "test-table": map[string]string{ + "test-column-1": "string", + "test-column-2": "string", + "test-column-3": "string", + }, + }, + }, + } + j.columnCountStat("test-table") + }) + } +} + var _ = Describe("Upload", Ordered, func() { var ( sourceID = "test-sourceID" @@ -208,14 +287,6 @@ var _ = Describe("Upload", Ordered, func() { }) Describe("Staging files and load files events match", func() { - BeforeEach(func() { - defaultStats := stats.Default - - DeferCleanup(func() { - stats.Default = defaultStats - }) - }) - When("Matched", func() { It("Should not send stats", func() { job.matchRowsInStagingAndLoadFiles() @@ -228,8 +299,7 @@ var _ = Describe("Upload", Ordered, func() { mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement) mockMeasurement.EXPECT().Gauge(gomock.Any()).Times(1) - stats.Default = mockStats - + job.stats = mockStats job.stagingFileIDs = []int64{1, 2} job.matchRowsInStagingAndLoadFiles() }) diff --git a/warehouse/warehouse.go b/warehouse/warehouse.go index 87c2cc4efa..e056dac96f 100644 --- a/warehouse/warehouse.go +++ b/warehouse/warehouse.go @@ -94,6 +94,7 @@ var ( maxParallelJobCreation int enableJitterForSyncs bool configBackendURL string + columnCountLimitThreshold float64 asyncWh *jobs.AsyncJobWhT ) @@ -198,6 +199,7 @@ func loadConfig() { config.RegisterIntConfigVariable(8, &maxParallelJobCreation, true, 1, "Warehouse.maxParallelJobCreation") config.RegisterBoolConfigVariable(false, &enableJitterForSyncs, true, "Warehouse.enableJitterForSyncs") config.RegisterDurationConfigVariable(30, &tableCountQueryTimeout, true, time.Second, []string{"Warehouse.tableCountQueryTimeout", "Warehouse.tableCountQueryTimeoutInS"}...) + config.RegisterFloat64ConfigVariable(0.8, &columnCountLimitThreshold, true, "Warehouse.columnCountLimitThreshold") appName = misc.DefaultString("rudder-server").OnError(os.Hostname()) configBackendURL = config.GetString("CONFIG_BACKEND_URL", "https://api.rudderstack.com") @@ -1055,6 +1057,7 @@ func (wh *HandleT) getUploadsToProcess(availableWorkers int, skipIdentifiers []s uploadJob := UploadJobT{ upload: &upload, dbHandle: wh.dbHandle, + stats: stats.Default, } err := fmt.Errorf("unable to find source : %s or destination : %s, both or the connection between them", upload.SourceID, upload.DestinationID) _, _ = uploadJob.setUploadError(err, model.Aborted) @@ -1088,6 +1091,7 @@ func (wh *HandleT) getUploadsToProcess(availableWorkers int, skipIdentifiers []s dbHandle: wh.dbHandle, pgNotifier: &wh.notifier, destinationValidator: validations.NewDestinationValidator(), + stats: stats.Default, } uploadJobs = append(uploadJobs, &uploadJob)