Skip to content

Commit

Permalink
BackportAdd dry-run mode to the TxThrottler
Browse files Browse the repository at this point in the history
This is a backport of upstreamed vitessio#13604

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
ejortegau committed Aug 2, 2023
1 parent 7203c35 commit a4b60a9
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 12 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,8 @@ max_rate_approach_threshold: 0.9
)
--tx-throttler-default-priority int
Default priority assigned to queries that lack priority information.
--tx-throttler-dry-run bool
If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests.
--tx-throttler-healthcheck-cells value
Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types value
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func init() {
flagutil.DualFormatStringListVar(&currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
flag.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information.")
topoproto.TabletTypeListVar(&currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)")
flag.BoolVar(&currentConfig.TxThrottlerDryRun, "tx-throttler-dry-run", defaultConfig.TxThrottlerDryRun, "If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests.")

flag.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.")
flag.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.")
Expand Down Expand Up @@ -306,6 +307,7 @@ type TabletConfig struct {
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes []topodatapb.TabletType `json:"-"`
TxThrottlerDryRun bool `json:"-"`

EnableLagThrottler bool `json:"-"`

Expand Down Expand Up @@ -538,6 +540,7 @@ var defaultConfig = TabletConfig{
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: 0, // This leads to all queries being candidates to throttle
TxThrottlerDryRun: false,

EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future

Expand Down
38 changes: 26 additions & 12 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type txThrottler struct {

// state holds an open transaction throttler state. It is nil
// if the TransactionThrottler is closed.
state *txThrottlerState
state txThrottlerState

target *querypb.Target
topoServer *topo.Server
Expand All @@ -154,6 +154,10 @@ type txThrottlerConfig struct {
// returns false.
enabled bool

// if dryRun is true, the txThrottler will run only on monitoring mode, meaning that it will increase counters for
// total and actually throttled requests, but it will not actually return that a transaction should be throttled.
dryRun bool

throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
Expand All @@ -162,8 +166,14 @@ type txThrottlerConfig struct {
tabletTypes []topodatapb.TabletType
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
type txThrottlerState interface {
deallocateResources()
StatsUpdate(tabletStats *discovery.LegacyTabletStats)
throttle() bool
}

// txThrottlerStateImpl holds the state of an open TxThrottler object.
type txThrottlerStateImpl struct {
config *txThrottlerConfig

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
Expand All @@ -187,7 +197,10 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{
enabled: false,
dryRun: env.Config().TxThrottlerDryRun,
})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
Expand All @@ -201,7 +214,7 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) {

func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false, dryRun: env.Config().TxThrottlerDryRun})
}

var throttlerConfig throttlerdatapb.Configuration
Expand All @@ -216,6 +229,7 @@ func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrott

return newTxThrottler(env, topoServer, &txThrottlerConfig{
enabled: true,
dryRun: env.Config().TxThrottlerDryRun,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: &throttlerConfig,
healthCheckCells: healthCheckCells,
Expand Down Expand Up @@ -294,10 +308,10 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
t.requestsThrottled.Add(workload, 1)
}

return result
return result && !t.config.dryRun
}

func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
Expand All @@ -314,7 +328,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
t.Close()
return nil, err
}
result := &txThrottlerState{
result := &txThrottlerStateImpl{
config: config,
throttler: t,
}
Expand All @@ -337,7 +351,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
return result, nil
}

func (ts *txThrottlerState) throttle() bool {
func (ts *txThrottlerStateImpl) throttle() bool {
if ts.throttler == nil {
log.Error("throttle called after deallocateResources was called")
return false
Expand All @@ -348,7 +362,7 @@ func (ts *txThrottlerState) throttle() bool {
return ts.throttler.Throttle(0 /* threadId */) > 0
}

func (ts *txThrottlerState) deallocateResources() {
func (ts *txThrottlerStateImpl) deallocateResources() {
// We don't really need to nil out the fields here
// as deallocateResources is not expected to be called
// more than once, but it doesn't hurt to do so.
Expand All @@ -360,14 +374,14 @@ func (ts *txThrottlerState) deallocateResources() {
ts.healthCheck.Close()
ts.healthCheck = nil

// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
ts.throttler = nil
}

// StatsUpdate updates the health of a tablet with the given healthcheck.
func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.LegacyTabletStats) {
func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.LegacyTabletStats) {
if ts.config.tabletTypes == nil {
return
}
Expand Down
51 changes: 51 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,54 @@ func TestNewTxThrottler(t *testing.T) {
assert.NotNil(t, throttler)
}
}

func TestDryRunThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())

testCases := []struct {
Name string
txThrottlerStateShouldThrottle bool
throttlerDryRun bool
expectedResult bool
}{
{Name: "Real run throttles when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: false, expectedResult: true},
{Name: "Real run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: false, expectedResult: false},
{Name: "Dry run does not throttle when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: true, expectedResult: false},
{Name: "Dry run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: true, expectedResult: false},
}

for _, aTestCase := range testCases {
theTestCase := aTestCase

t.Run(theTestCase.Name, func(t *testing.T) {
aTxThrottler := &txThrottler{
config: &txThrottlerConfig{
enabled: true,
dryRun: theTestCase.throttlerDryRun,
},
state: &mockTxThrottlerState{shouldThrottle: theTestCase.txThrottlerStateShouldThrottle},
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"),
}

assert.Equal(t, theTestCase.expectedResult, aTxThrottler.Throttle(100, "some-workload"))
})
}
}

type mockTxThrottlerState struct {
shouldThrottle bool
}

func (t *mockTxThrottlerState) deallocateResources() {

}
func (t *mockTxThrottlerState) StatsUpdate(*discovery.LegacyTabletStats) {

}

func (t *mockTxThrottlerState) throttle() bool {
return t.shouldThrottle
}

0 comments on commit a4b60a9

Please sign in to comment.