Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txthrottler: verify config at vttablet startup, consolidate funcs #13115

Merged
3 changes: 1 addition & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,10 @@ Usage of vttablet:
--twopc_abandon_age float time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.
--twopc_coordinator_address string address of the (VTGate) process(es) that will be used to notify of abandoned transactions.
--twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.
--tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100)
--tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
--tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.
--unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s)
--v Level log level for V logs
Expand Down
61 changes: 47 additions & 14 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ import (
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// These constants represent values for various config parameters.
Expand Down Expand Up @@ -89,6 +91,24 @@ var (
txLogHandler = "/debug/txlog"
)

type TxThrottlerConfigFlag struct {
*throttlerdatapb.Configuration
}

func NewTxThrottlerConfigFlag() *TxThrottlerConfigFlag {
return &TxThrottlerConfigFlag{&throttlerdatapb.Configuration{}}
}

func (t *TxThrottlerConfigFlag) Get() *throttlerdatapb.Configuration {
return t.Configuration
}

func (t *TxThrottlerConfigFlag) Set(arg string) error {
return prototext.Unmarshal([]byte(arg), t)
}

func (t *TxThrottlerConfigFlag) Type() string { return "string" }

// RegisterTabletEnvFlags is a public API to register tabletenv flags for use by test cases that expect
// some flags to be set with default values
func RegisterTabletEnvFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -158,7 +178,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
SecondsVar(fs, &currentConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.")
// Tx throttler config
flagutil.DualFormatBoolVar(fs, &currentConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.")
flagutil.DualFormatStringVar(fs, &currentConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
fs.Var(currentConfig.TxThrottlerConfig, "tx_throttler_config", "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
flagutil.DualFormatStringListVar(fs, &currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
fs.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information")
fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.")
Expand Down Expand Up @@ -340,7 +360,7 @@ type TabletConfig struct {
TwoPCAbandonAge Seconds `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"`
Expand Down Expand Up @@ -657,9 +677,6 @@ func (c *TabletConfig) Verify() error {
if v := c.HotRowProtection.MaxConcurrency; v <= 0 {
return fmt.Errorf("--hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to verifyTxThrottlerConfig()

return fmt.Errorf("--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}
return nil
}

Expand Down Expand Up @@ -695,6 +712,22 @@ func (c *TabletConfig) verifyTransactionLimitConfig() error {

// verifyTxThrottlerConfig checks the TxThrottler related config for sanity.
func (c *TabletConfig) verifyTxThrottlerConfig() error {
if !c.EnableTxThrottler {
return nil
}

err := throttler.MaxReplicationLagModuleConfig{Configuration: c.TxThrottlerConfig.Get()}.Verify()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err)
}

if len(c.TxThrottlerHealthCheckCells) == 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}

if c.TxThrottlerTabletTypes == nil || len(*c.TxThrottlerTabletTypes) == 0 {
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "--tx-throttler-tablet-types must be defined when transaction throttler is enabled")
}
Expand All @@ -706,6 +739,7 @@ func (c *TabletConfig) verifyTxThrottlerConfig() error {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported tablet type %q", tabletType)
}
}

return nil
}

Expand Down Expand Up @@ -812,17 +846,16 @@ var defaultConfig = TabletConfig{
EnablePerWorkloadTableMetrics: false,
}

// defaultTxThrottlerConfig formats the default throttlerdata.Configuration
// object in text format. It uses the object returned by
// throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of its
// fields. It panics on error.
func defaultTxThrottlerConfig() string {
// defaultTxThrottlerConfig returns the default TxThrottlerConfigFlag object based on
// a throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of
// its fields. It panics on error.
func defaultTxThrottlerConfig() *TxThrottlerConfigFlag {
// Take throttler.DefaultMaxReplicationLagModuleConfig and override some fields.
config := throttler.DefaultMaxReplicationLagModuleConfig().Configuration
// TODO(erez): Make DefaultMaxReplicationLagModuleConfig() return a MaxReplicationLagSec of 10
// and remove this line.
config.MaxReplicationLagSec = 10
return prototext.Format(config)
return &TxThrottlerConfigFlag{config}
}

func defaultTransactionLimitConfig() TransactionLimitConfig {
Expand Down
84 changes: 78 additions & 6 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (

"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/dbconfigs"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/yaml2"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestConfigParse(t *testing.T) {
Expand Down Expand Up @@ -332,31 +334,101 @@ func TestFlags(t *testing.T) {
assert.Equal(t, want, currentConfig)
}

func TestTxThrottlerConfigFlag(t *testing.T) {
f := NewTxThrottlerConfigFlag()
defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration

{
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.Equal(t, defaultMaxReplicationLagModuleConfig.String(), f.String())
assert.Equal(t, "string", f.Type())
}
{
defaultMaxReplicationLagModuleConfig.TargetReplicationLagSec = 5
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.NotNil(t, f.Get())
assert.Equal(t, int64(5), f.Get().TargetReplicationLagSec)
}
{
assert.NotNil(t, f.Set("should not parse"))
}
}

func TestVerifyTxThrottlerConfig(t *testing.T) {
defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration

{
// default config (replica)
// default (disabled)
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
}
{
// replica + rdonly (allowed)
// enabled without throttler config
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
currentConfig.EnableTxThrottler = true
assert.NotNil(t, currentConfig.verifyTxThrottlerConfig())
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err))
}
{
// enabled without cells defined
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err))
}
{
// enabled with good config (default/replica tablet type)
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
}
{
// enabled + replica and rdonly tablet types
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
}
{
// no tablet types
// enabled without tablet types
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err))
}
{
// disallowed tablet type
// enabled + disallowed tablet type
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_DRAINED}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err))
}
{
// enabled + disallowed priority
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}
currentConfig.TxThrottlerDefaultPriority = 12345
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err))
}
}
74 changes: 22 additions & 52 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package txthrottler

import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"time"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -186,64 +184,36 @@ type txThrottlerState struct {
// This function calls tryCreateTxThrottler that does the actual creation work
// and returns an error if one occurred.
func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
txThrottler, err := tryCreateTxThrottler(env, topoServer)
if err != nil {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
return txThrottler
}

// InitDBConfig initializes the target parameters for the throttler.
func (t *txThrottler) InitDBConfig(target *querypb.Target) {
t.target = proto.Clone(target).(*querypb.Target)
}

func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
}
throttlerConfig := &txThrottlerConfig{enabled: false}

if env.Config().EnableTxThrottler {
// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells))
copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells)
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved

throttlerConfig = &txThrottlerConfig{
enabled: true,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
healthCheckCells: healthCheckCells,
}

var throttlerConfig throttlerdatapb.Configuration
if err := prototext.Unmarshal([]byte(env.Config().TxThrottlerConfig), &throttlerConfig); err != nil {
return nil, err
defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig)
}

// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells))
copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells)

return newTxThrottler(env, topoServer, &txThrottlerConfig{
enabled: true,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: &throttlerConfig,
healthCheckCells: healthCheckCells,
})
}

func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*txThrottler, error) {
if config.enabled {
// Verify config.
err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify()
if err != nil {
return nil, err
}
if len(config.healthCheckCells) == 0 {
return nil, fmt.Errorf("empty healthCheckCells given. %+v", config)
}
}
return &txThrottler{
config: config,
config: throttlerConfig,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
}, nil
}
}

// InitDBConfig initializes the target parameters for the throttler.
func (t *txThrottler) InitDBConfig(target *querypb.Target) {
t.target = proto.Clone(target).(*querypb.Target)
}

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
Expand Down
Loading