Skip to content

Commit

Permalink
chore: use new reloadable config api for warehouse (#3920)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Sep 28, 2023
1 parent 7ca721c commit 6f2b7b9
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 109 deletions.
8 changes: 3 additions & 5 deletions warehouse/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type indexConstraint struct {

type constraintsManager struct {
constraintsMap map[string][]constraints
enableConstraintsViolations bool
enableConstraintsViolations misc.ValueLoader[bool]
}

func newConstraintsManager(conf *config.Config) *constraintsManager {
Expand Down Expand Up @@ -64,17 +64,15 @@ func newConstraintsManager(conf *config.Config) *constraintsManager {
},
},
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
conf.RegisterBoolConfigVariable(true, &cm.enableConstraintsViolations, true, "Warehouse.enableConstraintsViolations")
cm.enableConstraintsViolations = conf.GetReloadableBoolVar(true, "Warehouse.enableConstraintsViolations")

return cm
}

func (cm *constraintsManager) violatedConstraints(destinationType string, brEvent *BatchRouterEvent, columnName string) (cv *constraintsViolation) {
cv = &constraintsViolation{}

if !cm.enableConstraintsViolations {
if !cm.enableConstraintsViolations.Load() {
return
}

Expand Down
47 changes: 21 additions & 26 deletions warehouse/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ type router struct {
notifier *notifier.Notifier

config struct {
uploadFreqInS int64
noOfWorkers int
maxConcurrentUploadJobs int
allowMultipleSourcesForJobsPickup bool
enableJitterForSyncs bool
maxParallelJobCreation int
waitForWorkerSleep time.Duration
uploadAllocatorSleep time.Duration
mainLoopSleep time.Duration
stagingFilesBatchSize int
uploadStatusTrackFrequency time.Duration
warehouseSyncFreqIgnore bool
shouldPopulateHistoricIdentities bool
uploadFreqInS misc.ValueLoader[int64]
noOfWorkers misc.ValueLoader[int]
enableJitterForSyncs misc.ValueLoader[bool]
maxParallelJobCreation misc.ValueLoader[int]
mainLoopSleep misc.ValueLoader[time.Duration]
stagingFilesBatchSize misc.ValueLoader[int]
warehouseSyncFreqIgnore misc.ValueLoader[bool]
}

stats struct {
Expand Down Expand Up @@ -189,7 +189,13 @@ func newRouter(
r.config.uploadStatusTrackFrequency = r.conf.GetDurationVar(30, time.Minute, "Warehouse.uploadStatusTrackFrequency", "Warehouse.uploadStatusTrackFrequencyInMin")
r.config.allowMultipleSourcesForJobsPickup = r.conf.GetBoolVar(false, fmt.Sprintf(`Warehouse.%v.allowMultipleSourcesForJobsPickup`, whName))
r.config.shouldPopulateHistoricIdentities = r.conf.GetBoolVar(false, "Warehouse.populateHistoricIdentities")
r.setupReloadableVars(whName)
r.config.uploadFreqInS = r.conf.GetReloadableInt64Var(1800, 1, "Warehouse.uploadFreqInS")
r.config.noOfWorkers = r.conf.GetReloadableIntVar(8, 1, fmt.Sprintf(`Warehouse.%v.noOfWorkers`, whName), "Warehouse.noOfWorkers")
r.config.maxParallelJobCreation = r.conf.GetReloadableIntVar(8, 1, "Warehouse.maxParallelJobCreation")
r.config.mainLoopSleep = r.conf.GetReloadableDurationVar(5, time.Second, "Warehouse.mainLoopSleep", "Warehouse.mainLoopSleepInS")
r.config.stagingFilesBatchSize = r.conf.GetReloadableIntVar(960, 1, "Warehouse.stagingFilesBatchSize")
r.config.enableJitterForSyncs = r.conf.GetReloadableBoolVar(false, "Warehouse.enableJitterForSyncs")
r.config.warehouseSyncFreqIgnore = r.conf.GetReloadableBoolVar(false, "Warehouse.warehouseSyncFreqIgnore")

r.stats.processingPendingJobsStat = r.statsFactory.NewTaggedStat("wh_processing_pending_jobs", stats.GaugeType, stats.Tags{
"destType": r.destType,
Expand Down Expand Up @@ -236,17 +242,6 @@ func newRouter(
return r, nil
}

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
func (r *router) setupReloadableVars(whName string) {
r.conf.RegisterInt64ConfigVariable(1800, &r.config.uploadFreqInS, true, 1, "Warehouse.uploadFreqInS")
r.conf.RegisterIntConfigVariable(8, &r.config.noOfWorkers, true, 1, fmt.Sprintf(`Warehouse.%v.noOfWorkers`, whName), "Warehouse.noOfWorkers")
r.conf.RegisterIntConfigVariable(8, &r.config.maxParallelJobCreation, true, 1, "Warehouse.maxParallelJobCreation")
r.conf.RegisterDurationConfigVariable(5, &r.config.mainLoopSleep, true, time.Second, "Warehouse.mainLoopSleep", "Warehouse.mainLoopSleepInS")
r.conf.RegisterIntConfigVariable(960, &r.config.stagingFilesBatchSize, true, 1, "Warehouse.stagingFilesBatchSize")
r.conf.RegisterBoolConfigVariable(false, &r.config.enableJitterForSyncs, true, "Warehouse.enableJitterForSyncs")
r.conf.RegisterBoolConfigVariable(false, &r.config.warehouseSyncFreqIgnore, true, "Warehouse.warehouseSyncFreqIgnore")
}

// Backend Config subscriber subscribes to backend-config and gets all the configurations that includes all sources, destinations and their latest values.
func (r *router) backendConfigSubscriber(ctx context.Context) {
for warehouses := range r.bcManager.Subscribe(ctx) {
Expand Down Expand Up @@ -397,7 +392,7 @@ loop:
r.logger.Debugf("Initial config fetched in runUploadJobAllocator for %s", r.destType)
}

availableWorkers := r.config.noOfWorkers - r.getActiveWorkerCount()
availableWorkers := r.config.noOfWorkers.Load() - r.getActiveWorkerCount()
if availableWorkers < 1 {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -537,12 +532,12 @@ func (r *router) mainLoop(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(r.config.mainLoopSleep):
case <-time.After(r.config.mainLoopSleep.Load()):
}
continue
}

jobCreationChan := make(chan struct{}, r.config.maxParallelJobCreation)
jobCreationChan := make(chan struct{}, r.config.maxParallelJobCreation.Load())

r.configSubscriberLock.RLock()

Expand Down Expand Up @@ -580,7 +575,7 @@ func (r *router) mainLoop(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(r.config.mainLoopSleep):
case <-time.After(r.config.mainLoopSleep.Load()):
}
}
}
Expand Down Expand Up @@ -667,7 +662,7 @@ func (r *router) handlePriorityForWaitingUploads(ctx context.Context, warehouse
}

func (r *router) uploadStartAfterTime() time.Time {
if r.config.enableJitterForSyncs {
if r.config.enableJitterForSyncs.Load() {
return timeutil.Now().Add(time.Duration(rand.Intn(15)) * time.Second)
}
return r.now()
Expand All @@ -683,7 +678,7 @@ func (r *router) createUploadJobsFromStagingFiles(ctx context.Context, warehouse
priority = 50
}

batches := service.StageFileBatching(stagingFiles, r.config.stagingFilesBatchSize)
batches := service.StageFileBatching(stagingFiles, r.config.stagingFilesBatchSize.Load())
for _, batch := range batches {
upload := model.Upload{
SourceID: warehouse.Source.ID,
Expand Down Expand Up @@ -736,7 +731,7 @@ func (r *router) uploadFrequencyExceeded(warehouse model.Warehouse, syncFrequenc
func (r *router) uploadFreqInS(syncFrequency string) int64 {
freqInMin, err := strconv.ParseInt(syncFrequency, 10, 64)
if err != nil {
return r.config.uploadFreqInS
return r.config.uploadFreqInS.Load()
}
return freqInMin * 60
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/router_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *router) canCreateUpload(ctx context.Context, warehouse model.Warehouse)
return true, nil
}

if r.config.warehouseSyncFreqIgnore {
if r.config.warehouseSyncFreqIgnore.Load() {
if r.uploadFrequencyExceeded(warehouse, "") {
return true, nil
}
Expand Down
21 changes: 14 additions & 7 deletions warehouse/router_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/ory/dockertest/v3"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
Expand Down Expand Up @@ -205,7 +207,8 @@ func TestRouter_CanCreateUpload(t *testing.T) {
}

r := router{}
r.config.warehouseSyncFreqIgnore = true
r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800))
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.triggerStore = &sync.Map{}

canCreate, err := r.canCreateUpload(context.Background(), w)
Expand All @@ -224,8 +227,8 @@ func TestRouter_CanCreateUpload(t *testing.T) {
r.now = func() time.Time {
return now
}
r.config.uploadFreqInS = 1800
r.config.warehouseSyncFreqIgnore = true
r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800))
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.createJobMarkerMap = make(map[string]time.Time)
r.triggerStore = &sync.Map{}

Expand All @@ -247,8 +250,8 @@ func TestRouter_CanCreateUpload(t *testing.T) {
r.now = func() time.Time {
return now
}
r.config.uploadFreqInS = 1800
r.config.warehouseSyncFreqIgnore = true
r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800))
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.createJobMarkerMap = make(map[string]time.Time)
r.triggerStore = &sync.Map{}

Expand All @@ -275,6 +278,7 @@ func TestRouter_CanCreateUpload(t *testing.T) {

r := router{}
r.triggerStore = &sync.Map{}
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(false)
r.now = func() time.Time {
return time.Date(2009, time.November, 10, 5, 30, 0, 0, time.UTC)
}
Expand All @@ -298,7 +302,8 @@ func TestRouter_CanCreateUpload(t *testing.T) {
r.now = func() time.Time {
return now
}
r.config.uploadFreqInS = 1800
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(false)
r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800))
r.createJobMarkerMap = make(map[string]time.Time)
r.triggerStore = &sync.Map{}

Expand All @@ -323,7 +328,8 @@ func TestRouter_CanCreateUpload(t *testing.T) {
r.now = func() time.Time {
return now
}
r.config.uploadFreqInS = 1800
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(false)
r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800))
r.triggerStore = &sync.Map{}
r.createJobMarkerMap = make(map[string]time.Time)

Expand Down Expand Up @@ -413,6 +419,7 @@ func TestRouter_CanCreateUpload(t *testing.T) {

r := router{}
r.triggerStore = &sync.Map{}
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(false)
r.createJobMarkerMap = make(map[string]time.Time)
r.uploadRepo = repoUpload
r.now = func() time.Time {
Expand Down
54 changes: 30 additions & 24 deletions warehouse/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/rudderlabs/rudder-server/services/notifier"

"github.com/samber/lo"
Expand Down Expand Up @@ -194,9 +196,10 @@ func TestRouter(t *testing.T) {
r.stagingRepo = repoStaging
r.statsFactory = memstats.New()
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800))
r.config.stagingFilesBatchSize = misc.SingleValueLoader(100)
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.config.enableJitterForSyncs = misc.SingleValueLoader(true)
r.destType = destinationType
r.logger = logger.NOP
r.triggerStore = &sync.Map{}
Expand Down Expand Up @@ -349,9 +352,9 @@ func TestRouter(t *testing.T) {
r.stagingRepo = repoStaging
r.statsFactory = memstats.New()
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
r.config.stagingFilesBatchSize = misc.SingleValueLoader(100)
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.config.enableJitterForSyncs = misc.SingleValueLoader(true)
r.destType = destinationType
r.inProgressMap = make(map[workerIdentifierMapKey][]jobID)
r.triggerStore = &sync.Map{}
Expand Down Expand Up @@ -482,11 +485,13 @@ func TestRouter(t *testing.T) {
r.uploadRepo = repoUpload
r.stagingRepo = repoStaging
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
r.config.mainLoopSleep = time.Millisecond * 100
r.config.maxParallelJobCreation = 100
r.config.uploadFreqInS = misc.SingleValueLoader(int64(1800))
r.config.stagingFilesBatchSize = misc.SingleValueLoader(100)
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.config.enableJitterForSyncs = misc.SingleValueLoader(true)
r.config.enableJitterForSyncs = misc.SingleValueLoader(true)
r.config.mainLoopSleep = misc.SingleValueLoader(time.Millisecond * 100)
r.config.maxParallelJobCreation = misc.SingleValueLoader(100)
r.destType = destinationType
r.logger = logger.NOP
r.now = func() time.Time {
Expand Down Expand Up @@ -587,8 +592,8 @@ func TestRouter(t *testing.T) {
r.statsFactory = memstats.New()
r.conf = config.Default
r.config.allowMultipleSourcesForJobsPickup = true
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.stagingFilesBatchSize = misc.SingleValueLoader(100)
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.destType = destinationType
r.logger = logger.NOP
r.tenantManager = multitenant.New(config.Default, mocksBackendConfig.NewMockBackendConfig(ctrl))
Expand Down Expand Up @@ -720,9 +725,9 @@ func TestRouter(t *testing.T) {
r.statsFactory = memstats.New()
r.conf = config.Default
r.config.allowMultipleSourcesForJobsPickup = true
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.noOfWorkers = 10
r.config.stagingFilesBatchSize = misc.SingleValueLoader(100)
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.config.noOfWorkers = misc.SingleValueLoader(10)
r.config.waitForWorkerSleep = time.Millisecond * 100
r.config.uploadAllocatorSleep = time.Millisecond * 100
r.destType = warehouseutils.RS
Expand Down Expand Up @@ -869,9 +874,9 @@ func TestRouter(t *testing.T) {
r.statsFactory = memstats.New()
r.conf = config.Default
r.config.allowMultipleSourcesForJobsPickup = true
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.noOfWorkers = 0
r.config.stagingFilesBatchSize = misc.SingleValueLoader(100)
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.config.noOfWorkers = misc.SingleValueLoader(0)
r.config.waitForWorkerSleep = time.Second * 5
r.config.uploadAllocatorSleep = time.Millisecond * 100
r.destType = warehouseutils.RS
Expand Down Expand Up @@ -957,11 +962,12 @@ func TestRouter(t *testing.T) {
r.uploadRepo = repoUpload
r.stagingRepo = repoStaging
r.conf = config.Default
r.config.stagingFilesBatchSize = 100
r.config.warehouseSyncFreqIgnore = true
r.config.enableJitterForSyncs = true
r.config.mainLoopSleep = time.Second * 5
r.config.maxParallelJobCreation = 100

r.config.stagingFilesBatchSize = misc.SingleValueLoader(100)
r.config.warehouseSyncFreqIgnore = misc.SingleValueLoader(true)
r.config.enableJitterForSyncs = misc.SingleValueLoader(true)
r.config.mainLoopSleep = misc.SingleValueLoader(time.Second * 5)
r.config.maxParallelJobCreation = misc.SingleValueLoader(100)
r.destType = destinationType
r.logger = logger.NOP
r.now = func() time.Time {
Expand Down
10 changes: 4 additions & 6 deletions warehouse/slave.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type slave struct {
encodingFactory *encoding.Factory

config struct {
noOfSlaveWorkerRoutines int
noOfSlaveWorkerRoutines misc.ValueLoader[int]
}
}

Expand All @@ -54,21 +54,19 @@ func newSlave(
s.bcManager = bcManager
s.constraintsManager = constraintsManager
s.encodingFactory = encodingFactory

// nolint:staticcheck // SA1019: config Register reloadable functions are deprecated
conf.RegisterIntConfigVariable(4, &s.config.noOfSlaveWorkerRoutines, true, 1, "Warehouse.noOfSlaveWorkerRoutines")
s.config.noOfSlaveWorkerRoutines = conf.GetReloadableIntVar(4, 1, "Warehouse.noOfSlaveWorkerRoutines")

return s
}

func (s *slave) setupSlave(ctx context.Context) error {
slaveID := misc.FastUUID().String()

jobNotificationChannel := s.notifier.Subscribe(ctx, slaveID, s.config.noOfSlaveWorkerRoutines)
jobNotificationChannel := s.notifier.Subscribe(ctx, slaveID, s.config.noOfSlaveWorkerRoutines.Load())

g, gCtx := errgroup.WithContext(ctx)

for workerIdx := 0; workerIdx <= s.config.noOfSlaveWorkerRoutines-1; workerIdx++ {
for workerIdx := 0; workerIdx <= s.config.noOfSlaveWorkerRoutines.Load()-1; workerIdx++ {
idx := workerIdx

g.Go(misc.WithBugsnagForWarehouse(func() error {
Expand Down
2 changes: 1 addition & 1 deletion warehouse/slave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestSlave(t *testing.T) {
subscribeCh: subscriberCh,
}

workers := 4
workers := misc.SingleValueLoader(4)
workerJobs := 25

tenantManager := multitenant.New(
Expand Down
Loading

0 comments on commit 6f2b7b9

Please sign in to comment.