Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txthrottler: further code cleanup #12902

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 53 additions & 50 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ var (
throttlerFactory throttlerFactoryFunc
)

func init() {
resetTxThrottlerFactories()
}

func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
Expand All @@ -69,6 +65,10 @@ func resetTxThrottlerFactories() {
}
}

func init() {
resetTxThrottlerFactories()
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Expand Down Expand Up @@ -132,14 +132,46 @@ type TxThrottler struct {
// if the TransactionThrottler is closed.
state *txThrottlerState

target *querypb.Target
target *querypb.Target
topoServer *topo.Server

// stats
throttlerRunning *stats.Gauge
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
}

// txThrottlerConfig holds the parameters that need to be
// passed when constructing a TxThrottler object.
type txThrottlerConfig struct {
// enabled is true if the transaction throttler is enabled. All methods
// of a disabled transaction throttler do nothing and Throttle() always
// returns false.
enabled bool

throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
healthCheckCells []string

// tabletTypes stores the tablet types for throttling
tabletTypes *topoproto.TabletTypeListFlag
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
config *txThrottlerConfig

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
topologyWatchers []TopologyWatcherInterface
}

// NewTxThrottler tries to construct a TxThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
Expand All @@ -151,7 +183,7 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, &txThrottlerConfig{enabled: false})
txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
Expand All @@ -165,7 +197,7 @@ func (t *TxThrottler) InitDBConfig(target *querypb.Target) {

func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, &txThrottlerConfig{enabled: false})
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
}

var throttlerConfig throttlerdatapb.Configuration
Expand All @@ -178,48 +210,15 @@ func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrott
healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells))
copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells)

return newTxThrottler(env, &txThrottlerConfig{
return newTxThrottler(env, topoServer, &txThrottlerConfig{
enabled: true,
topoServer: topoServer,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: &throttlerConfig,
healthCheckCells: healthCheckCells,
})
}

// txThrottlerConfig holds the parameters that need to be
// passed when constructing a TxThrottler object.
type txThrottlerConfig struct {
// enabled is true if the transaction throttler is enabled. All methods
// of a disabled transaction throttler do nothing and Throttle() always
// returns false.
enabled bool

topoServer *topo.Server
throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
healthCheckCells []string

// tabletTypes stores the tablet types for throttling
tabletTypes *topoproto.TabletTypeListFlag
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
config *txThrottlerConfig

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
topologyWatchers []TopologyWatcherInterface
}

func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, error) {
func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*TxThrottler, error) {
if config.enabled {
// Verify config.
err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify()
Expand All @@ -232,6 +231,7 @@ func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler,
}
return &TxThrottler{
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
Expand All @@ -248,7 +248,7 @@ func (t *TxThrottler) Open() (err error) {
}
log.Info("TxThrottler: opening")
t.throttlerRunning.Set(1)
t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell)
t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target)
return err
}

Expand Down Expand Up @@ -276,6 +276,9 @@ func (t *TxThrottler) Throttle(priority int) (result bool) {
if !t.config.enabled {
return false
}
if t.state == nil {
return false
}

// Throttle according to both what the throttle state says, and the priority. Workloads with lower priority value
// are less likely to be throttled.
Expand All @@ -288,7 +291,7 @@ func (t *TxThrottler) Throttle(priority int) (result bool) {
return result
}

func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) {
func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
Expand All @@ -309,29 +312,29 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string
config: config,
throttler: t,
}
createTxThrottlerHealthCheck(config, result, cell)
createTxThrottlerHealthCheck(topoServer, config, result, target.Cell)

result.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
result.topologyWatchers = append(
result.topologyWatchers,
topologyWatcherFactory(
config.topoServer,
topoServer,
result.healthCheck,
cell,
keyspace,
shard,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
}
return result, nil
}

func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottlerState, cell string) {
func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) {
ctx, cancel := context.WithCancel(context.Background())
result.stopHealthCheck = cancel
result.healthCheck = healthCheckFactory(config.topoServer, cell, config.healthCheckCells)
result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells)
ch := result.healthCheck.Subscribe()
go func(ctx context.Context) {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ func TestNewTxThrottler(t *testing.T) {

{
// disabled config
throttler, err := newTxThrottler(env, &txThrottlerConfig{enabled: false})
throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{enabled: false})
assert.Nil(t, err)
assert.NotNil(t, throttler)
}
{
// enabled with invalid throttler config
throttler, err := newTxThrottler(env, &txThrottlerConfig{
throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{
enabled: true,
throttlerConfig: &throttlerdatapb.Configuration{},
})
Expand All @@ -169,7 +169,7 @@ func TestNewTxThrottler(t *testing.T) {
}
{
// enabled
throttler, err := newTxThrottler(env, &txThrottlerConfig{
throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{
enabled: true,
healthCheckCells: []string{"cell1"},
throttlerConfig: throttler.DefaultMaxReplicationLagModuleConfig().Configuration,
Expand Down