diff --git a/warehouse/constraint.go b/warehouse/constraint.go index f118d008bd..df5aa01f36 100644 --- a/warehouse/constraint.go +++ b/warehouse/constraint.go @@ -28,7 +28,7 @@ type indexConstraint struct { type constraintsManager struct { constraintsMap map[string][]constraints - enableConstraintsViolations bool + enableConstraintsViolations misc.ValueLoader[bool] } func newConstraintsManager(conf *config.Config) *constraintsManager { @@ -64,9 +64,7 @@ func newConstraintsManager(conf *config.Config) *constraintsManager { }, }, } - - // nolint:staticcheck // SA1019: config Register reloadable functions are deprecated - conf.RegisterBoolConfigVariable(true, &cm.enableConstraintsViolations, true, "Warehouse.enableConstraintsViolations") + cm.enableConstraintsViolations = conf.GetReloadableBoolVar(true, "Warehouse.enableConstraintsViolations") return cm } @@ -74,7 +72,7 @@ func newConstraintsManager(conf *config.Config) *constraintsManager { func (cm *constraintsManager) violatedConstraints(destinationType string, brEvent *BatchRouterEvent, columnName string) (cv *constraintsViolation) { cv = &constraintsViolation{} - if !cm.enableConstraintsViolations { + if !cm.enableConstraintsViolations.Load() { return } diff --git a/warehouse/router.go b/warehouse/router.go index 907692d725..f227639f2c 100644 --- a/warehouse/router.go +++ b/warehouse/router.go @@ -93,19 +93,19 @@ type router struct { notifier *notifier.Notifier config struct { - uploadFreqInS int64 - noOfWorkers int maxConcurrentUploadJobs int allowMultipleSourcesForJobsPickup bool - enableJitterForSyncs bool - maxParallelJobCreation int waitForWorkerSleep time.Duration uploadAllocatorSleep time.Duration - mainLoopSleep time.Duration - stagingFilesBatchSize int uploadStatusTrackFrequency time.Duration - warehouseSyncFreqIgnore bool shouldPopulateHistoricIdentities bool + uploadFreqInS misc.ValueLoader[int64] + noOfWorkers misc.ValueLoader[int] + enableJitterForSyncs misc.ValueLoader[bool] + maxParallelJobCreation misc.ValueLoader[int] + mainLoopSleep misc.ValueLoader[time.Duration] + stagingFilesBatchSize misc.ValueLoader[int] + warehouseSyncFreqIgnore misc.ValueLoader[bool] } stats struct { @@ -189,7 +189,13 @@ func newRouter( r.config.uploadStatusTrackFrequency = r.conf.GetDurationVar(30, time.Minute, "Warehouse.uploadStatusTrackFrequency", "Warehouse.uploadStatusTrackFrequencyInMin") r.config.allowMultipleSourcesForJobsPickup = r.conf.GetBoolVar(false, fmt.Sprintf(`Warehouse.%v.allowMultipleSourcesForJobsPickup`, whName)) r.config.shouldPopulateHistoricIdentities = r.conf.GetBoolVar(false, "Warehouse.populateHistoricIdentities") - r.setupReloadableVars(whName) + r.config.uploadFreqInS = r.conf.GetReloadableInt64Var(1800, 1, "Warehouse.uploadFreqInS") + r.config.noOfWorkers = r.conf.GetReloadableIntVar(8, 1, fmt.Sprintf(`Warehouse.%v.noOfWorkers`, whName), "Warehouse.noOfWorkers") + r.config.maxParallelJobCreation = r.conf.GetReloadableIntVar(8, 1, "Warehouse.maxParallelJobCreation") + r.config.mainLoopSleep = r.conf.GetReloadableDurationVar(5, time.Second, "Warehouse.mainLoopSleep", "Warehouse.mainLoopSleepInS") + r.config.stagingFilesBatchSize = r.conf.GetReloadableIntVar(960, 1, "Warehouse.stagingFilesBatchSize") + r.config.enableJitterForSyncs = r.conf.GetReloadableBoolVar(false, "Warehouse.enableJitterForSyncs") + r.config.warehouseSyncFreqIgnore = r.conf.GetReloadableBoolVar(false, "Warehouse.warehouseSyncFreqIgnore") r.stats.processingPendingJobsStat = r.statsFactory.NewTaggedStat("wh_processing_pending_jobs", stats.GaugeType, stats.Tags{ "destType": r.destType, @@ -236,17 +242,6 @@ func newRouter( return r, nil } -// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated -func (r *router) setupReloadableVars(whName string) { - r.conf.RegisterInt64ConfigVariable(1800, &r.config.uploadFreqInS, true, 1, "Warehouse.uploadFreqInS") - r.conf.RegisterIntConfigVariable(8, &r.config.noOfWorkers, true, 1, fmt.Sprintf(`Warehouse.%v.noOfWorkers`, whName), "Warehouse.noOfWorkers") - r.conf.RegisterIntConfigVariable(8, &r.config.maxParallelJobCreation, true, 1, "Warehouse.maxParallelJobCreation") - r.conf.RegisterDurationConfigVariable(5, &r.config.mainLoopSleep, true, time.Second, "Warehouse.mainLoopSleep", "Warehouse.mainLoopSleepInS") - r.conf.RegisterIntConfigVariable(960, &r.config.stagingFilesBatchSize, true, 1, "Warehouse.stagingFilesBatchSize") - r.conf.RegisterBoolConfigVariable(false, &r.config.enableJitterForSyncs, true, "Warehouse.enableJitterForSyncs") - r.conf.RegisterBoolConfigVariable(false, &r.config.warehouseSyncFreqIgnore, true, "Warehouse.warehouseSyncFreqIgnore") -} - // Backend Config subscriber subscribes to backend-config and gets all the configurations that includes all sources, destinations and their latest values. func (r *router) backendConfigSubscriber(ctx context.Context) { for warehouses := range r.bcManager.Subscribe(ctx) { @@ -397,7 +392,7 @@ loop: r.logger.Debugf("Initial config fetched in runUploadJobAllocator for %s", r.destType) } - availableWorkers := r.config.noOfWorkers - r.getActiveWorkerCount() + availableWorkers := r.config.noOfWorkers.Load() - r.getActiveWorkerCount() if availableWorkers < 1 { select { case <-ctx.Done(): @@ -537,12 +532,12 @@ func (r *router) mainLoop(ctx context.Context) { select { case <-ctx.Done(): return - case <-time.After(r.config.mainLoopSleep): + case <-time.After(r.config.mainLoopSleep.Load()): } continue } - jobCreationChan := make(chan struct{}, r.config.maxParallelJobCreation) + jobCreationChan := make(chan struct{}, r.config.maxParallelJobCreation.Load()) r.configSubscriberLock.RLock() @@ -580,7 +575,7 @@ func (r *router) mainLoop(ctx context.Context) { select { case <-ctx.Done(): return - case <-time.After(r.config.mainLoopSleep): + case <-time.After(r.config.mainLoopSleep.Load()): } } } @@ -667,7 +662,7 @@ func (r *router) handlePriorityForWaitingUploads(ctx context.Context, warehouse } func (r *router) uploadStartAfterTime() time.Time { - if r.config.enableJitterForSyncs { + if r.config.enableJitterForSyncs.Load() { return timeutil.Now().Add(time.Duration(rand.Intn(15)) * time.Second) } return r.now() @@ -683,7 +678,7 @@ func (r *router) createUploadJobsFromStagingFiles(ctx context.Context, warehouse priority = 50 } - batches := service.StageFileBatching(stagingFiles, r.config.stagingFilesBatchSize) + batches := service.StageFileBatching(stagingFiles, r.config.stagingFilesBatchSize.Load()) for _, batch := range batches { upload := model.Upload{ SourceID: warehouse.Source.ID, @@ -736,7 +731,7 @@ func (r *router) uploadFrequencyExceeded(warehouse model.Warehouse, syncFrequenc func (r *router) uploadFreqInS(syncFrequency string) int64 { freqInMin, err := strconv.ParseInt(syncFrequency, 10, 64) if err != nil { - return r.config.uploadFreqInS + return r.config.uploadFreqInS.Load() } return freqInMin * 60 } diff --git a/warehouse/router_scheduling.go b/warehouse/router_scheduling.go index 0ad2eb8937..d1e25d62ae 100644 --- a/warehouse/router_scheduling.go +++ b/warehouse/router_scheduling.go @@ -39,7 +39,7 @@ func (r *router) canCreateUpload(ctx context.Context, warehouse model.Warehouse) return true, nil } - if r.config.warehouseSyncFreqIgnore { + if r.config.warehouseSyncFreqIgnore.Load() { if r.uploadFrequencyExceeded(warehouse, "") { return true, nil } diff --git a/warehouse/router_scheduling_test.go b/warehouse/router_scheduling_test.go index 92a27ec3b3..0353760805 100644 --- a/warehouse/router_scheduling_test.go +++ b/warehouse/router_scheduling_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/rudderlabs/rudder-server/utils/misc" + "github.com/ory/dockertest/v3" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" @@ -205,7 +207,8 @@ func TestRouter_CanCreateUpload(t *testing.T) { } r := router{} - r.config.warehouseSyncFreqIgnore = true + r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800)) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) r.triggerStore = &sync.Map{} canCreate, err := r.canCreateUpload(context.Background(), w) @@ -224,8 +227,8 @@ func TestRouter_CanCreateUpload(t *testing.T) { r.now = func() time.Time { return now } - r.config.uploadFreqInS = 1800 - r.config.warehouseSyncFreqIgnore = true + r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800)) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) r.createJobMarkerMap = make(map[string]time.Time) r.triggerStore = &sync.Map{} @@ -247,8 +250,8 @@ func TestRouter_CanCreateUpload(t *testing.T) { r.now = func() time.Time { return now } - r.config.uploadFreqInS = 1800 - r.config.warehouseSyncFreqIgnore = true + r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800)) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) r.createJobMarkerMap = make(map[string]time.Time) r.triggerStore = &sync.Map{} @@ -275,6 +278,7 @@ func TestRouter_CanCreateUpload(t *testing.T) { r := router{} r.triggerStore = &sync.Map{} + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(false) r.now = func() time.Time { return time.Date(2009, time.November, 10, 5, 30, 0, 0, time.UTC) } @@ -298,7 +302,8 @@ func TestRouter_CanCreateUpload(t *testing.T) { r.now = func() time.Time { return now } - r.config.uploadFreqInS = 1800 + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(false) + r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800)) r.createJobMarkerMap = make(map[string]time.Time) r.triggerStore = &sync.Map{} @@ -323,7 +328,8 @@ func TestRouter_CanCreateUpload(t *testing.T) { r.now = func() time.Time { return now } - r.config.uploadFreqInS = 1800 + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(false) + r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800)) r.triggerStore = &sync.Map{} r.createJobMarkerMap = make(map[string]time.Time) @@ -413,6 +419,7 @@ func TestRouter_CanCreateUpload(t *testing.T) { r := router{} r.triggerStore = &sync.Map{} + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(false) r.createJobMarkerMap = make(map[string]time.Time) r.uploadRepo = repoUpload r.now = func() time.Time { diff --git a/warehouse/router_test.go b/warehouse/router_test.go index f3f8b14238..69359c46d8 100644 --- a/warehouse/router_test.go +++ b/warehouse/router_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/rudderlabs/rudder-server/utils/misc" + "github.com/rudderlabs/rudder-server/services/notifier" "github.com/samber/lo" @@ -194,9 +196,10 @@ func TestRouter(t *testing.T) { r.stagingRepo = repoStaging r.statsFactory = memstats.New() r.conf = config.Default - r.config.stagingFilesBatchSize = 100 - r.config.warehouseSyncFreqIgnore = true - r.config.enableJitterForSyncs = true + r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800)) + r.config.stagingFilesBatchSize = misc.SingleValueLoader(100) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) + r.config.enableJitterForSyncs = misc.SingleValueLoader(true) r.destType = destinationType r.logger = logger.NOP r.triggerStore = &sync.Map{} @@ -349,9 +352,9 @@ func TestRouter(t *testing.T) { r.stagingRepo = repoStaging r.statsFactory = memstats.New() r.conf = config.Default - r.config.stagingFilesBatchSize = 100 - r.config.warehouseSyncFreqIgnore = true - r.config.enableJitterForSyncs = true + r.config.stagingFilesBatchSize = misc.SingleValueLoader(100) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) + r.config.enableJitterForSyncs = misc.SingleValueLoader(true) r.destType = destinationType r.inProgressMap = make(map[workerIdentifierMapKey][]jobID) r.triggerStore = &sync.Map{} @@ -482,11 +485,13 @@ func TestRouter(t *testing.T) { r.uploadRepo = repoUpload r.stagingRepo = repoStaging r.conf = config.Default - r.config.stagingFilesBatchSize = 100 - r.config.warehouseSyncFreqIgnore = true - r.config.enableJitterForSyncs = true - r.config.mainLoopSleep = time.Millisecond * 100 - r.config.maxParallelJobCreation = 100 + r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800)) + r.config.stagingFilesBatchSize = misc.SingleValueLoader(100) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) + r.config.enableJitterForSyncs = misc.SingleValueLoader(true) + r.config.enableJitterForSyncs = misc.SingleValueLoader(true) + r.config.mainLoopSleep = misc.SingleValueLoader(time.Millisecond * 100) + r.config.maxParallelJobCreation = misc.SingleValueLoader(100) r.destType = destinationType r.logger = logger.NOP r.now = func() time.Time { @@ -587,8 +592,8 @@ func TestRouter(t *testing.T) { r.statsFactory = memstats.New() r.conf = config.Default r.config.allowMultipleSourcesForJobsPickup = true - r.config.stagingFilesBatchSize = 100 - r.config.warehouseSyncFreqIgnore = true + r.config.stagingFilesBatchSize = misc.SingleValueLoader(100) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) r.destType = destinationType r.logger = logger.NOP r.tenantManager = multitenant.New(config.Default, mocksBackendConfig.NewMockBackendConfig(ctrl)) @@ -720,9 +725,9 @@ func TestRouter(t *testing.T) { r.statsFactory = memstats.New() r.conf = config.Default r.config.allowMultipleSourcesForJobsPickup = true - r.config.stagingFilesBatchSize = 100 - r.config.warehouseSyncFreqIgnore = true - r.config.noOfWorkers = 10 + r.config.stagingFilesBatchSize = misc.SingleValueLoader(100) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) + r.config.noOfWorkers = misc.SingleValueLoader(10) r.config.waitForWorkerSleep = time.Millisecond * 100 r.config.uploadAllocatorSleep = time.Millisecond * 100 r.destType = warehouseutils.RS @@ -869,9 +874,9 @@ func TestRouter(t *testing.T) { r.statsFactory = memstats.New() r.conf = config.Default r.config.allowMultipleSourcesForJobsPickup = true - r.config.stagingFilesBatchSize = 100 - r.config.warehouseSyncFreqIgnore = true - r.config.noOfWorkers = 0 + r.config.stagingFilesBatchSize = misc.SingleValueLoader(100) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) + r.config.noOfWorkers = misc.SingleValueLoader(0) r.config.waitForWorkerSleep = time.Second * 5 r.config.uploadAllocatorSleep = time.Millisecond * 100 r.destType = warehouseutils.RS @@ -957,11 +962,12 @@ func TestRouter(t *testing.T) { r.uploadRepo = repoUpload r.stagingRepo = repoStaging r.conf = config.Default - r.config.stagingFilesBatchSize = 100 - r.config.warehouseSyncFreqIgnore = true - r.config.enableJitterForSyncs = true - r.config.mainLoopSleep = time.Second * 5 - r.config.maxParallelJobCreation = 100 + + r.config.stagingFilesBatchSize = misc.SingleValueLoader(100) + r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true) + r.config.enableJitterForSyncs = misc.SingleValueLoader(true) + r.config.mainLoopSleep = misc.SingleValueLoader(time.Second * 5) + r.config.maxParallelJobCreation = misc.SingleValueLoader(100) r.destType = destinationType r.logger = logger.NOP r.now = func() time.Time { diff --git a/warehouse/slave.go b/warehouse/slave.go index 362809e6d1..09aab23e05 100644 --- a/warehouse/slave.go +++ b/warehouse/slave.go @@ -32,7 +32,7 @@ type slave struct { encodingFactory *encoding.Factory config struct { - noOfSlaveWorkerRoutines int + noOfSlaveWorkerRoutines misc.ValueLoader[int] } } @@ -54,9 +54,7 @@ func newSlave( s.bcManager = bcManager s.constraintsManager = constraintsManager s.encodingFactory = encodingFactory - - // nolint:staticcheck // SA1019: config Register reloadable functions are deprecated - conf.RegisterIntConfigVariable(4, &s.config.noOfSlaveWorkerRoutines, true, 1, "Warehouse.noOfSlaveWorkerRoutines") + s.config.noOfSlaveWorkerRoutines = conf.GetReloadableIntVar(4, 1, "Warehouse.noOfSlaveWorkerRoutines") return s } @@ -64,11 +62,11 @@ func newSlave( func (s *slave) setupSlave(ctx context.Context) error { slaveID := misc.FastUUID().String() - jobNotificationChannel := s.notifier.Subscribe(ctx, slaveID, s.config.noOfSlaveWorkerRoutines) + jobNotificationChannel := s.notifier.Subscribe(ctx, slaveID, s.config.noOfSlaveWorkerRoutines.Load()) g, gCtx := errgroup.WithContext(ctx) - for workerIdx := 0; workerIdx <= s.config.noOfSlaveWorkerRoutines-1; workerIdx++ { + for workerIdx := 0; workerIdx <= s.config.noOfSlaveWorkerRoutines.Load()-1; workerIdx++ { idx := workerIdx g.Go(misc.WithBugsnagForWarehouse(func() error { diff --git a/warehouse/slave_test.go b/warehouse/slave_test.go index ee444c87b9..9590b8ce15 100644 --- a/warehouse/slave_test.go +++ b/warehouse/slave_test.go @@ -87,7 +87,7 @@ func TestSlave(t *testing.T) { subscribeCh: subscriberCh, } - workers := 4 + workers := misc.SingleValueLoader(4) workerJobs := 25 tenantManager := multitenant.New( diff --git a/warehouse/slave_worker.go b/warehouse/slave_worker.go index 790b365ded..cbb9b55fa9 100644 --- a/warehouse/slave_worker.go +++ b/warehouse/slave_worker.go @@ -11,6 +11,8 @@ import ( "strconv" "time" + "github.com/rudderlabs/rudder-server/utils/misc" + "github.com/rudderlabs/rudder-server/services/notifier" "github.com/rudderlabs/rudder-go-kit/logger" @@ -61,7 +63,7 @@ type slaveWorker struct { workerIdx int config struct { - maxStagingFileReadBufferCapacityInK int + maxStagingFileReadBufferCapacityInK misc.ValueLoader[int] } stats struct { workerIdleTime stats.Measurement @@ -92,8 +94,7 @@ func newSlaveWorker( s.encodingFactory = encodingFactory s.workerIdx = workerIdx - // nolint:staticcheck // SA1019: config Register reloadable functions are deprecated - conf.RegisterIntConfigVariable(10240, &s.config.maxStagingFileReadBufferCapacityInK, true, 1, "Warehouse.maxStagingFileReadBufferCapacityInK") + s.config.maxStagingFileReadBufferCapacityInK = s.conf.GetReloadableIntVar(10240, 1, "Warehouse.maxStagingFileReadBufferCapacityInK") tags := stats.Tags{ "module": moduleName, @@ -243,7 +244,7 @@ func (sw *slaveWorker) processStagingFile(ctx context.Context, job payload) ([]u // default scanner buffer maxCapacity is 64K // set it to higher value to avoid read stop on read size error - maxCapacity := sw.config.maxStagingFileReadBufferCapacityInK * 1024 + maxCapacity := sw.config.maxStagingFileReadBufferCapacityInK.Load() * 1024 bufScanner := bufio.NewScanner(jr.stagingFileReader) bufScanner.Buffer(make([]byte, maxCapacity), maxCapacity) diff --git a/warehouse/slave_worker_job.go b/warehouse/slave_worker_job.go index acf8c8bf0b..b3e898f39d 100644 --- a/warehouse/slave_worker_job.go +++ b/warehouse/slave_worker_job.go @@ -135,11 +135,7 @@ func newJobRun(job payload, conf *config.Config, log logger.Logger, stat stats.S encodingFactory: encodingFactory, } - if conf.IsSet("Warehouse.slaveUploadTimeout") { - jr.config.slaveUploadTimeout = conf.GetDuration("Warehouse.slaveUploadTimeout", 10, time.Minute) - } else { - jr.config.slaveUploadTimeout = conf.GetDuration("Warehouse.slaveUploadTimeoutInMin", 10, time.Minute) - } + jr.config.slaveUploadTimeout = conf.GetDurationVar(10, time.Minute, "Warehouse.slaveUploadTimeout", "Warehouse.slaveUploadTimeoutInMin") jr.config.numLoadFileUploadWorkers = conf.GetInt("Warehouse.numLoadFileUploadWorkers", 8) jr.config.loadObjectFolder = conf.GetString("WAREHOUSE_BUCKET_LOAD_OBJECTS_FOLDER_NAME", "rudder-warehouse-load-objects") diff --git a/warehouse/upload.go b/warehouse/upload.go index a371af8811..93f52017e0 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -225,32 +225,11 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo uj.config.disableGenerateTableLoadCountMetricsWorkspaceIDs = f.conf.GetStringSlice("Warehouse.disableGenerateTableLoadCountMetricsWorkspaceIDs", nil) uj.config.columnsBatchSize = f.conf.GetInt(fmt.Sprintf("Warehouse.%s.columnsBatchSize", whutils.WHDestNameMap[uj.upload.DestinationType]), 100) uj.config.maxParallelLoadsWorkspaceIDs = f.conf.GetStringMap(fmt.Sprintf("Warehouse.%s.maxParallelLoadsWorkspaceIDs", whutils.WHDestNameMap[uj.upload.DestinationType]), nil) - - if f.conf.IsSet("Warehouse.tableCountQueryTimeout") { - uj.config.tableCountQueryTimeout = f.conf.GetDuration("Warehouse.tableCountQueryTimeout", 30, time.Second) - } else { - uj.config.tableCountQueryTimeout = f.conf.GetDuration("Warehouse.tableCountQueryTimeoutInS", 30, time.Second) - } - if f.conf.IsSet("Warehouse.longRunningUploadStatThreshold") { - uj.config.longRunningUploadStatThresholdInMin = f.conf.GetDuration("Warehouse.longRunningUploadStatThreshold", 120, time.Minute) - } else { - uj.config.longRunningUploadStatThresholdInMin = f.conf.GetDuration("Warehouse.longRunningUploadStatThresholdInMin", 120, time.Minute) - } - if f.conf.IsSet("Warehouse.minUploadBackoff") { - uj.config.minUploadBackoff = f.conf.GetDuration("Warehouse.minUploadBackoff", 60, time.Second) - } else { - uj.config.minUploadBackoff = f.conf.GetDuration("Warehouse.minUploadBackoffInS", 60, time.Second) - } - if f.conf.IsSet("Warehouse.maxUploadBackoff") { - uj.config.maxUploadBackoff = f.conf.GetDuration("Warehouse.maxUploadBackoff", 1800, time.Second) - } else { - uj.config.maxUploadBackoff = f.conf.GetDuration("Warehouse.maxUploadBackoffInS", 1800, time.Second) - } - if f.conf.IsSet("Warehouse.retryTimeWindow") { - uj.config.retryTimeWindow = f.conf.GetDuration("Warehouse.retryTimeWindow", 180, time.Minute) - } else { - uj.config.retryTimeWindow = f.conf.GetDuration("Warehouse.retryTimeWindowInMins", 180, time.Minute) - } + uj.config.tableCountQueryTimeout = f.conf.GetDurationVar(30, time.Second, "Warehouse.tableCountQueryTimeout", "Warehouse.tableCountQueryTimeoutInS") + uj.config.longRunningUploadStatThresholdInMin = f.conf.GetDurationVar(120, time.Minute, "Warehouse.longRunningUploadStatThreshold", "Warehouse.longRunningUploadStatThresholdInMin") + uj.config.minUploadBackoff = f.conf.GetDurationVar(60, time.Second, "Warehouse.minUploadBackoff", "Warehouse.minUploadBackoffInS") + uj.config.maxUploadBackoff = f.conf.GetDurationVar(1800, time.Second, "Warehouse.maxUploadBackoff", "Warehouse.maxUploadBackoffInS") + uj.config.retryTimeWindow = f.conf.GetDurationVar(180, time.Minute, "Warehouse.retryTimeWindow", "Warehouse.retryTimeWindowInMins") uj.stats.uploadTime = uj.timerStat("upload_time") uj.stats.userTablesLoadTime = uj.timerStat("user_tables_load_time") diff --git a/warehouse/utils/utils.go b/warehouse/utils/utils.go index 5638e84122..eaf0e28de5 100644 --- a/warehouse/utils/utils.go +++ b/warehouse/utils/utils.go @@ -133,7 +133,7 @@ const ( var ( pkgLogger logger.Logger enableIDResolution bool - AWSCredsExpiryInS int64 + awsCredsExpiryInS misc.ValueLoader[int64] TimeWindowDestinations = []string{S3Datalake, GCSDatalake, AzureDatalake} WarehouseDestinations = []string{RS, BQ, SNOWFLAKE, POSTGRES, CLICKHOUSE, MSSQL, AzureSynapse, S3Datalake, GCSDatalake, AzureDatalake, DELTALAKE} @@ -194,10 +194,9 @@ func Init() { pkgLogger = logger.NewLogger().Child("warehouse").Child("utils") } -// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated func loadConfig() { enableIDResolution = config.GetBoolVar(false, "Warehouse.enableIDResolution") - config.RegisterInt64ConfigVariable(3600, &AWSCredsExpiryInS, true, 1, "Warehouse.awsCredsExpiryInS") + awsCredsExpiryInS = config.GetReloadableInt64Var(3600, 1, "Warehouse.awsCredsExpiryInS") } type DeleteByMetaData struct { @@ -726,7 +725,8 @@ func GetTemporaryS3Cred(destination *backendconfig.DestinationT) (string, string // Create an STS client from just a session. svc := sts.New(awsSession) - sessionTokenOutput, err := svc.GetSessionToken(&sts.GetSessionTokenInput{DurationSeconds: &AWSCredsExpiryInS}) + expiryInSec := awsCredsExpiryInS.Load() + sessionTokenOutput, err := svc.GetSessionToken(&sts.GetSessionTokenInput{DurationSeconds: &expiryInSec}) if err != nil { return "", "", "", err }