Skip to content

Commit

Permalink
Merge branch 'slack-vitess-r14.0.5-dsdefense' into upgrade-go-slack-v…
Browse files Browse the repository at this point in the history
…itess-r14.0.5
  • Loading branch information
maksimov authored Aug 10, 2023
2 parents cc27630 + 333c488 commit 472435d
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 34 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,8 @@ max_rate_approach_threshold: 0.9
)
--tx-throttler-default-priority int
Default priority assigned to queries that lack priority information.
--tx-throttler-dry-run
If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests.
--tx-throttler-healthcheck-cells value
Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types value
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT

if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) {
return nil, errTxThrottled
}

Expand All @@ -215,7 +215,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) {
return nil, errTxThrottled
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,6 @@ func (m mockTxThrottler) Open() (err error) {
func (m mockTxThrottler) Close() {
}

func (m mockTxThrottler) Throttle(priority int) (result bool) {
func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) {
return m.throttle
}
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func init() {
flagutil.DualFormatStringListVar(&currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
flag.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information.")
topoproto.TabletTypeListVar(&currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)")
flag.BoolVar(&currentConfig.TxThrottlerDryRun, "tx-throttler-dry-run", defaultConfig.TxThrottlerDryRun, "If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests.")

flag.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.")
flag.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.")
Expand Down Expand Up @@ -306,6 +307,7 @@ type TabletConfig struct {
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes []topodatapb.TabletType `json:"-"`
TxThrottlerDryRun bool `json:"-"`

EnableLagThrottler bool `json:"-"`

Expand Down Expand Up @@ -538,6 +540,7 @@ var defaultConfig = TabletConfig{
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: 0, // This leads to all queries being candidates to throttle
TxThrottlerDryRun: false,

EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, preQ
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) {
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options), options.GetWorkloadName()) {
return errTxThrottled
}
var beginSQL string
Expand Down
54 changes: 34 additions & 20 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type TxThrottler interface {
InitDBConfig(target *querypb.Target)
Open() (err error)
Close()
Throttle(priority int) (result bool)
Throttle(priority int, workload string) (result bool)
}

func init() {
Expand Down Expand Up @@ -135,15 +135,15 @@ type txThrottler struct {

// state holds an open transaction throttler state. It is nil
// if the TransactionThrottler is closed.
state *txThrottlerState
state txThrottlerState

target *querypb.Target
topoServer *topo.Server

// stats
throttlerRunning *stats.Gauge
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
requestsTotal *stats.CountersWithSingleLabel
requestsThrottled *stats.CountersWithSingleLabel
}

// txThrottlerConfig holds the parameters that need to be
Expand All @@ -154,6 +154,10 @@ type txThrottlerConfig struct {
// returns false.
enabled bool

// if dryRun is true, the txThrottler will run only on monitoring mode, meaning that it will increase counters for
// total and actually throttled requests, but it will not actually return that a transaction should be throttled.
dryRun bool

throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
Expand All @@ -162,8 +166,14 @@ type txThrottlerConfig struct {
tabletTypes []topodatapb.TabletType
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
type txThrottlerState interface {
deallocateResources()
StatsUpdate(tabletStats *discovery.LegacyTabletStats)
throttle() bool
}

// txThrottlerStateImpl holds the state of an open TxThrottler object.
type txThrottlerStateImpl struct {
config *txThrottlerConfig

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
Expand All @@ -187,7 +197,10 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{
enabled: false,
dryRun: env.Config().TxThrottlerDryRun,
})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
Expand All @@ -201,7 +214,7 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) {

func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false, dryRun: env.Config().TxThrottlerDryRun})
}

var throttlerConfig throttlerdatapb.Configuration
Expand All @@ -216,6 +229,7 @@ func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrott

return newTxThrottler(env, topoServer, &txThrottlerConfig{
enabled: true,
dryRun: env.Config().TxThrottlerDryRun,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: &throttlerConfig,
healthCheckCells: healthCheckCells,
Expand All @@ -237,8 +251,8 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"),
}, nil
}

Expand Down Expand Up @@ -276,7 +290,7 @@ func (t *txThrottler) Close() {
// It returns true if the transaction should not proceed (the caller
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *txThrottler) Throttle(priority int) (result bool) {
func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
if !t.config.enabled {
return false
}
Expand All @@ -289,15 +303,15 @@ func (t *txThrottler) Throttle(priority int) (result bool) {
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority

t.requestsTotal.Add(1)
t.requestsTotal.Add(workload, 1)
if result {
t.requestsThrottled.Add(1)
t.requestsThrottled.Add(workload, 1)
}

return result
return result && !t.config.dryRun
}

func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
Expand All @@ -314,7 +328,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
t.Close()
return nil, err
}
result := &txThrottlerState{
result := &txThrottlerStateImpl{
config: config,
throttler: t,
}
Expand All @@ -337,7 +351,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
return result, nil
}

func (ts *txThrottlerState) throttle() bool {
func (ts *txThrottlerStateImpl) throttle() bool {
if ts.throttler == nil {
log.Error("throttle called after deallocateResources was called")
return false
Expand All @@ -348,7 +362,7 @@ func (ts *txThrottlerState) throttle() bool {
return ts.throttler.Throttle(0 /* threadId */) > 0
}

func (ts *txThrottlerState) deallocateResources() {
func (ts *txThrottlerStateImpl) deallocateResources() {
// We don't really need to nil out the fields here
// as deallocateResources is not expected to be called
// more than once, but it doesn't hurt to do so.
Expand All @@ -360,14 +374,14 @@ func (ts *txThrottlerState) deallocateResources() {
ts.healthCheck.Close()
ts.healthCheck = nil

// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
ts.throttler = nil
}

// StatsUpdate updates the health of a tablet with the given healthcheck.
func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.LegacyTabletStats) {
func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.LegacyTabletStats) {
if ts.config.tabletTypes == nil {
return
}
Expand Down
71 changes: 61 additions & 10 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestDisabledThrottler(t *testing.T) {
Shard: "shard",
})
assert.Nil(t, throttler.Open())
assert.False(t, throttler.Throttle(0))
assert.False(t, throttler.Throttle(0, "some-workload"))
throttlerImpl, _ := throttler.(*txThrottler)
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
throttler.Close()
Expand Down Expand Up @@ -126,9 +126,9 @@ func TestEnabledThrottler(t *testing.T) {
assert.Nil(t, throttler.Open())
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())

assert.False(t, throttler.Throttle(100))
assert.Equal(t, int64(1), throttler.requestsTotal.Get())
assert.Zero(t, throttler.requestsThrottled.Get())
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"])
assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"])

throttler.state.StatsUpdate(tabletStats)
rdonlyTabletStats := &discovery.LegacyTabletStats{
Expand All @@ -139,14 +139,14 @@ func TestEnabledThrottler(t *testing.T) {
// This call should not be forwarded to the go/vt/throttler.Throttler object.
hcListener.StatsUpdate(rdonlyTabletStats)
// The second throttle call should reject.
assert.True(t, throttler.Throttle(100))
assert.Equal(t, int64(2), throttler.requestsTotal.Get())
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())
assert.True(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(2), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// This call should not throttle due to priority. Check that's the case and counters agree.
assert.False(t, throttler.Throttle(0))
assert.Equal(t, int64(3), throttler.requestsTotal.Get())
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())
assert.False(t, throttler.Throttle(0, "some-workload"))
assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])
throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}
Expand Down Expand Up @@ -181,3 +181,54 @@ func TestNewTxThrottler(t *testing.T) {
assert.NotNil(t, throttler)
}
}

func TestDryRunThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())

testCases := []struct {
Name string
txThrottlerStateShouldThrottle bool
throttlerDryRun bool
expectedResult bool
}{
{Name: "Real run throttles when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: false, expectedResult: true},
{Name: "Real run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: false, expectedResult: false},
{Name: "Dry run does not throttle when txThrottlerStateImpl says it should", txThrottlerStateShouldThrottle: true, throttlerDryRun: true, expectedResult: false},
{Name: "Dry run does not throttle when txThrottlerStateImpl says it should not", txThrottlerStateShouldThrottle: false, throttlerDryRun: true, expectedResult: false},
}

for _, aTestCase := range testCases {
theTestCase := aTestCase

t.Run(theTestCase.Name, func(t *testing.T) {
aTxThrottler := &txThrottler{
config: &txThrottlerConfig{
enabled: true,
dryRun: theTestCase.throttlerDryRun,
},
state: &mockTxThrottlerState{shouldThrottle: theTestCase.txThrottlerStateShouldThrottle},
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"),
}

assert.Equal(t, theTestCase.expectedResult, aTxThrottler.Throttle(100, "some-workload"))
})
}
}

type mockTxThrottlerState struct {
shouldThrottle bool
}

func (t *mockTxThrottlerState) deallocateResources() {

}
func (t *mockTxThrottlerState) StatsUpdate(*discovery.LegacyTabletStats) {

}

func (t *mockTxThrottlerState) throttle() bool {
return t.shouldThrottle
}

0 comments on commit 472435d

Please sign in to comment.