diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index d65e14a063c..4f1556dd17e 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -147,7 +147,7 @@ func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, enabl tabletBody := getHTTPBody(tabletURL) class := strings.ToLower(gjson.Get(tabletBody, "0.Class").String()) value := strings.ToLower(gjson.Get(tabletBody, "0.Value").String()) - if class == "unhappy" && strings.Contains(value, "primary: not serving") { + if class == "unhappy" && strings.Contains(value, "not serving") { log.Infof("tablet %s is Not Serving, so ignoring throttler status as the throttler will not be Opened", tablet.Alias) return } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index bfbc8725297..e94b22d7d66 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -225,7 +225,7 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv // CheckIsReady checks if this throttler is ready to serve. If not, it // returns an error. func (throttler *Throttler) CheckIsReady() error { - if throttler.IsOpen() && throttler.IsEnabled() { + if throttler.IsRunning() { // all good return nil } @@ -318,8 +318,6 @@ func (throttler *Throttler) normalizeThrottlerConfig(thottlerConfig *topodatapb. } func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspace, err error) bool { - throttler.initMutex.Lock() - defer throttler.initMutex.Unlock() log.Infof("Throttler: WatchSrvKeyspaceCallback called with: %+v", srvks) if err != nil { log.Errorf("WatchSrvKeyspaceCallback error: %v", err) @@ -327,15 +325,17 @@ func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspa } throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig) - if throttler.IsOpen() { - // Throttler is open/running and we should apply the config change + if throttler.IsEnabled() { + // Throttler is enabled and we should apply the config change // through Operate() or else we get into race conditions. go func() { log.Infof("Throttler: submitting a throttler config apply message with: %+v", throttlerConfig) throttler.throttlerConfigChan <- throttlerConfig }() } else { - // Throttler is not open/running, we should apply directly. + throttler.initMutex.Lock() + defer throttler.initMutex.Unlock() + // Throttler is not enabled, we should apply directly. throttler.applyThrottlerConfig(context.Background(), throttlerConfig) } @@ -460,7 +460,7 @@ func (throttler *Throttler) Open() error { defer retryTicker.Stop() for { if !throttler.IsOpen() { - // Throttler is not open/running so no need to keep retrying. + // Throttler is not open so no need to keep retrying. log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting") return } @@ -582,6 +582,10 @@ func (throttler *Throttler) isDormant() bool { return time.Since(lastCheckTime) > dormantPeriod } +func (throttler *Throttler) IsRunning() bool { + return throttler.IsOpen() && throttler.IsEnabled() +} + // Operate is the main entry point for the throttler operation and logic. It will // run the probes, collect metrics, refresh inventory, etc. func (throttler *Throttler) Operate(ctx context.Context) {