diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 1208e4a303c..b7dcbbb68c2 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -17,16 +17,14 @@ limitations under the License. package txthrottler import ( + "context" "fmt" "strings" "sync" "time" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/encoding/prototext" - - "context" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" @@ -40,6 +38,61 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +// These vars store the functions used to create the topo server, healthcheck, +// topology watchers and go/vt/throttler. These are provided here so that they can be overridden +// in tests to generate mocks. +type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck +type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface +type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) + +var ( + healthCheckFactory healthCheckFactoryFunc + topologyWatcherFactory topologyWatcherFactoryFunc + throttlerFactory throttlerFactoryFunc +) + +func init() { + resetTxThrottlerFactories() +} + +func resetTxThrottlerFactories() { + healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { + return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) + } + topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { + return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency) + } + throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { + return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) + } +} + +// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler +// It is only used here to allow mocking out a throttler object. +type ThrottlerInterface interface { + Throttle(threadID int) time.Duration + ThreadFinished(threadID int) + Close() + MaxRate() int64 + SetMaxRate(rate int64) + RecordReplicationLag(time time.Time, th *discovery.TabletHealth) + GetConfiguration() *throttlerdatapb.Configuration + UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error + ResetConfiguration() +} + +// TopologyWatcherInterface defines the public interface that is implemented by +// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out +// go/vt/discovery.LegacyTopologyWatcher. +type TopologyWatcherInterface interface { + Start() + Stop() +} + +// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with +// go/vt/throttler.GlobalManager. +const TxThrottlerName = "TransactionThrottler" + // TxThrottler throttles transactions based on replication lag. // It's a thin wrapper around the throttler found in vitess/go/vt/throttler. // It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler. @@ -85,10 +138,6 @@ type TxThrottler struct { requestsThrottled *stats.Counter } -// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with -// go/vt/throttler.GlobalManager. -const TxThrottlerName = "TransactionThrottler" - // NewTxThrottler tries to construct a TxThrottler from the // relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if // any error occurs. @@ -99,10 +148,8 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler { if err != nil { log.Errorf("Error creating transaction throttler. Transaction throttling will"+ " be disabled. Error: %v", err) - txThrottler, err = newTxThrottler(env, &txThrottlerConfig{enabled: false}) - if err != nil { - panic("BUG: Can't create a disabled transaction throttler") - } + // newTxThrottler with disabled config never returns an error + txThrottler, _ = newTxThrottler(env, &txThrottlerConfig{enabled: false}) } else { log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config) } @@ -152,28 +199,6 @@ type txThrottlerConfig struct { healthCheckCells []string } -// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler -// It is only used here to allow mocking out a throttler object. -type ThrottlerInterface interface { - Throttle(threadID int) time.Duration - ThreadFinished(threadID int) - Close() - MaxRate() int64 - SetMaxRate(rate int64) - RecordReplicationLag(time time.Time, th *discovery.TabletHealth) - GetConfiguration() *throttlerdatapb.Configuration - UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error - ResetConfiguration() -} - -// TopologyWatcherInterface defines the public interface that is implemented by -// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out -// go/vt/discovery.LegacyTopologyWatcher. -type TopologyWatcherInterface interface { - Start() - Stop() -} - // txThrottlerState holds the state of an open TxThrottler object. type txThrottlerState struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). @@ -186,35 +211,6 @@ type txThrottlerState struct { topologyWatchers []TopologyWatcherInterface } -// These vars store the functions used to create the topo server, healthcheck, -// topology watchers and go/vt/throttler. These are provided here so that they can be overridden -// in tests to generate mocks. -type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface -type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) - -var ( - healthCheckFactory healthCheckFactoryFunc - topologyWatcherFactory topologyWatcherFactoryFunc - throttlerFactory throttlerFactoryFunc -) - -func init() { - resetTxThrottlerFactories() -} - -func resetTxThrottlerFactories() { - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) - } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency) - } - throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { - return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) - } -} - func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, error) { if config.enabled { // Verify config. @@ -235,7 +231,7 @@ func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, } // Open opens the transaction throttler. It must be called prior to 'Throttle'. -func (t *TxThrottler) Open() error { +func (t *TxThrottler) Open() (err error) { if !t.config.enabled { return nil } @@ -244,7 +240,6 @@ func (t *TxThrottler) Open() error { } log.Info("TxThrottler: opening") t.throttlerRunning.Set(1) - var err error t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell) return err } @@ -273,9 +268,6 @@ func (t *TxThrottler) Throttle() (result bool) { if !t.config.enabled { return false } - if t.state == nil { - panic("BUG: Throttle() called on a closed TxThrottler") - } result = t.state.throttle() t.requestsTotal.Add(1) if result { @@ -342,7 +334,8 @@ func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottler func (ts *txThrottlerState) throttle() bool { if ts.throttler == nil { - panic("BUG: throttle called after deallocateResources was called.") + log.Error("throttle called after deallocateResources was called") + return false } // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 1053068d14a..7a345563f3f 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" + throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -135,3 +136,34 @@ func TestEnabledThrottler(t *testing.T) { throttler.Close() assert.Zero(t, throttler.throttlerRunning.Get()) } + +func TestNewTxThrottler(t *testing.T) { + config := tabletenv.NewDefaultConfig() + env := tabletenv.NewEnv(config, t.Name()) + + { + // disabled config + throttler, err := newTxThrottler(env, &txThrottlerConfig{enabled: false}) + assert.Nil(t, err) + assert.NotNil(t, throttler) + } + { + // enabled with invalid throttler config + throttler, err := newTxThrottler(env, &txThrottlerConfig{ + enabled: true, + throttlerConfig: &throttlerdatapb.Configuration{}, + }) + assert.NotNil(t, err) + assert.Nil(t, throttler) + } + { + // enabled + throttler, err := newTxThrottler(env, &txThrottlerConfig{ + enabled: true, + healthCheckCells: []string{"cell1"}, + throttlerConfig: throttler.DefaultMaxReplicationLagModuleConfig().Configuration, + }) + assert.Nil(t, err) + assert.NotNil(t, throttler) + } +}