Skip to content

Commit

Permalink
feat(warehouse): added support for warehouse column count limit
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Nov 23, 2022
1 parent 4f0d189 commit 8b40519
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 85 deletions.
1 change: 1 addition & 0 deletions warehouse/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func (wh *HandleT) populateHistoricIdentities(warehouse warehouseutils.Warehouse
dbHandle: wh.dbHandle,
pgNotifier: &wh.notifier,
destinationValidator: validations.NewDestinationValidator(),
stats: stats.Default,
}

tableUploadsCreated := areTableUploadsCreated(job.upload.ID)
Expand Down
6 changes: 5 additions & 1 deletion warehouse/slave.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/services/stats"

"github.com/rudderlabs/rudder-server/warehouse/model"

"github.com/rudderlabs/rudder-server/config"
Expand Down Expand Up @@ -48,6 +50,7 @@ type JobRunT struct {
tableEventCountMap map[string]int
stagingFileReader *gzip.Reader
whIdentifier string
stats stats.Stats
}

func (jobRun *JobRunT) setStagingFileReader() (reader *gzip.Reader, endOfFile bool) {
Expand Down Expand Up @@ -387,6 +390,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l
jobRun := JobRunT{
job: job,
whIdentifier: warehouseutils.GetWarehouseIdentifier(job.DestinationType, job.SourceID, job.DestinationID),
stats: stats.Default,
}

defer jobRun.counterStat("staging_files_processed", tag{name: "worker_id", value: strconv.Itoa(workerIndex)}).Count(1)
Expand Down Expand Up @@ -455,7 +459,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l
tableName := batchRouterEvent.Metadata.Table
columnData := batchRouterEvent.Data

if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountThresholds[warehouseutils.S3_DATALAKE] {
if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountLimitMap[warehouseutils.S3_DATALAKE] {
pkgLogger.Errorf("[WH]: Huge staging file columns : columns in upload schema: %v for StagingFileID: %v", len(sortedTableColumnMap[tableName]), job.StagingFileID)
return nil, fmt.Errorf("staging file schema limit exceeded for stagingFileID: %d, actualCount: %d", job.StagingFileID, len(sortedTableColumnMap[tableName]))
}
Expand Down
10 changes: 5 additions & 5 deletions warehouse/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (job *UploadJobT) timerStat(name string, extraTags ...tag) stats.Measuremen
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.TimerType, tags)
return job.stats.NewTaggedStat(name, stats.TimerType, tags)
}

func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -59,7 +59,7 @@ func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurem
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.CountType, tags)
return job.stats.NewTaggedStat(name, stats.CountType, tags)
}

func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -75,7 +75,7 @@ func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measuremen
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.GaugeType, tags)
return job.stats.NewTaggedStat(name, stats.GaugeType, tags)
}

func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -90,7 +90,7 @@ func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measuremen
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.TimerType, tags)
return jobRun.stats.NewTaggedStat(name, stats.TimerType, tags)
}

func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -105,7 +105,7 @@ func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurem
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.CountType, tags)
return jobRun.stats.NewTaggedStat(name, stats.CountType, tags)
}

func (job *UploadJobT) generateUploadSuccessMetrics() {
Expand Down
60 changes: 8 additions & 52 deletions warehouse/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
. "github.com/onsi/gomega"

"github.com/ory/dockertest/v3"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/destination"
"github.com/rudderlabs/rudder-server/utils/logger"
Expand All @@ -22,17 +20,11 @@ import (

var _ = Describe("Stats", Ordered, func() {
var (
sourceID = "test-sourceID"
destinationID = "test-destinationID"
destinationType = "test-desinationType"
destinationName = "test-destinationName"
sourceName = "test-sourceName"
statName = "test-statName"
g = GinkgoT()
pgResource *destination.PostgresResource
err error
uploadID = int64(1)
cleanup = &testhelper.Cleanup{}
g = GinkgoT()
pgResource *destination.PostgresResource
err error
uploadID = int64(1)
cleanup = &testhelper.Cleanup{}
)

BeforeAll(func() {
Expand All @@ -59,39 +51,6 @@ var _ = Describe("Stats", Ordered, func() {
cleanup.Run()
})

BeforeEach(func() {
defaultStats := stats.Default

DeferCleanup(func() {
stats.Default = defaultStats
})
})

Describe("Jobs stats", func() {
BeforeEach(func() {
mockStats, mockMeasurement := getMockStats(g)
mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement)
mockMeasurement.EXPECT().Count(gomock.Any()).AnyTimes()

stats.Default = mockStats
})

It("Upload status stat", func() {
getUploadStatusStat(statName, warehouseutils.Warehouse{
WorkspaceID: "workspaceID",
Source: backendconfig.SourceT{ID: sourceID, Name: sourceName},
Destination: backendconfig.DestinationT{ID: destinationID, Name: destinationName},
Namespace: "",
Type: destinationType,
Identifier: "",
})
})

It("Persist ssl file error stat", func() {
persistSSLFileErrorStat("workspaceID", destinationType, destinationName, destinationID, sourceName, sourceID, "")
})
})

Describe("Generate upload success/aborted metrics", func() {
var job *UploadJobT

Expand All @@ -101,8 +60,6 @@ var _ = Describe("Stats", Ordered, func() {
mockMeasurement.EXPECT().Count(4).Times(2)
mockMeasurement.EXPECT().Count(1).Times(1)

stats.Default = mockStats

job = &UploadJobT{
upload: &Upload{
ID: uploadID,
Expand All @@ -112,6 +69,7 @@ var _ = Describe("Stats", Ordered, func() {
warehouse: warehouseutils.Warehouse{
Type: "POSTGRES",
},
stats: mockStats,
}
})

Expand All @@ -130,8 +88,6 @@ var _ = Describe("Stats", Ordered, func() {
mockMeasurement.EXPECT().Count(4).Times(2)
mockMeasurement.EXPECT().Since(gomock.Any()).Times(1)

stats.Default = mockStats

job := &UploadJobT{
upload: &Upload{
WorkspaceID: "workspaceID",
Expand All @@ -142,6 +98,7 @@ var _ = Describe("Stats", Ordered, func() {
warehouse: warehouseutils.Warehouse{
Type: "POSTGRES",
},
stats: mockStats,
}
job.recordTableLoad("tracks", 4)
})
Expand All @@ -151,8 +108,6 @@ var _ = Describe("Stats", Ordered, func() {
mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement)
mockMeasurement.EXPECT().SendTiming(gomock.Any()).Times(1)

stats.Default = mockStats

job := &UploadJobT{
upload: &Upload{
ID: uploadID,
Expand All @@ -163,6 +118,7 @@ var _ = Describe("Stats", Ordered, func() {
Type: "POSTGRES",
},
dbHandle: pgResource.DB,
stats: mockStats,
}

err = job.recordLoadFileGenerationTimeStat(1, 4)
Expand Down
48 changes: 32 additions & 16 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type UploadJobT struct {
hasAllTablesSkipped bool
tableUploadStatuses []*TableUploadStatusT
destinationValidator validations.DestinationValidator
stats stats.Stats
}

type UploadColumnT struct {
Expand All @@ -145,8 +146,8 @@ var (
)

var (
maxParallelLoads map[string]int
columnCountThresholds map[string]int
maxParallelLoads map[string]int
columnCountLimitMap map[string]int
)

func Init() {
Expand All @@ -167,15 +168,15 @@ func setMaxParallelLoads() {
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.maxParallelLoads", 3),
warehouseutils.DELTALAKE: config.GetInt("Warehouse.deltalake.maxParallelLoads", 3),
}
columnCountThresholds = map[string]int{
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountThreshold", 800),
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountThreshold", 8000),
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountThreshold", 800),
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountThreshold", 800),
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountThreshold", 1200),
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountThreshold", 1200),
warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountThreshold", 1600),
warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountThreshold", 10000),
columnCountLimitMap = map[string]int{
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountLimit", 1024),
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountLimit", 10000),
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountLimit", 1000),
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountLimit", 1024),
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountLimit", 1600),
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountLimit", 1600),
warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountLimit", 5000),
warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountLimit", 10000),
}
}

Expand Down Expand Up @@ -204,7 +205,7 @@ func (job *UploadJobT) trackLongRunningUpload() chan struct{} {
case <-time.After(longRunningUploadStatThresholdInMin):
pkgLogger.Infof("[WH]: Registering stat for long running upload: %d, dest: %s", job.upload.ID, job.warehouse.Identifier)

stats.Default.NewTaggedStat(
job.stats.NewTaggedStat(
"warehouse.long_running_upload",
stats.CountType,
stats.Tags{
Expand Down Expand Up @@ -1067,10 +1068,25 @@ func (job *UploadJobT) loadTable(tName string) (alteredSchema bool, err error) {
job.recordTableLoad(tName, numEvents)
}

if columnThreshold, ok := columnCountThresholds[job.warehouse.Type]; ok {
columnCount := len(job.schemaHandle.schemaInWarehouse[tName])
if columnCount > columnThreshold {
job.counterStat(`warehouse_load_table_column_count`, tag{name: "tableName", value: strings.ToLower(tName)}).Count(columnCount)
job.columnCountStat(tName)

return
}

func (job *UploadJobT) columnCountStat(tableName string) {
if columnCountLimit, ok := columnCountLimitMap[job.warehouse.Type]; ok {
currentColumnsCount := len(job.schemaHandle.schemaInWarehouse[tableName])
if currentColumnsCount > int(float64(columnCountLimit)*columnCountLimitThreshold) {
tags := []tag{
{
name: "tableName", value: strings.ToLower(tableName),
},
{
name: "columnCountLimit", value: strconv.Itoa(columnCountLimit),
},
}

job.counterStat(`warehouse_load_table_column_count`, tags...).Count(currentColumnsCount)
}
}
return
Expand Down
Loading

0 comments on commit 8b40519

Please sign in to comment.