Skip to content

Commit

Permalink
txthrottler: verify config at vttablet startup, consolidate funcs (vi…
Browse files Browse the repository at this point in the history
…tessio#13115)

* txthrottler: verify config at vttablet startup, consolidate funcs

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Use explicit dest in prototext.Unmarshal

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Use for loop for TestVerifyTxThrottlerConfig

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Cleanup test

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Fix go vet complaint

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Add back synonym flag

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Update go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go

Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Address staticcheck linter error

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

---------

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
timvaillancourt and shlomi-noach committed May 16, 2024
1 parent 3bf58c3 commit 45c854b
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 129 deletions.
4 changes: 2 additions & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,11 @@ Usage of vttablet:
--twopc_abandon_age float time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.
--twopc_coordinator_address string address of the (VTGate) process(es) that will be used to notify of abandoned transactions.
--twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.
--tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
--tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100)
--tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
--tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.
--unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s)
--use_super_read_only Set super_read_only flag when performing planned failover.
Expand Down
16 changes: 16 additions & 0 deletions go/flagutil/flagutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ func DualFormatBoolVar(fs *pflag.FlagSet, p *bool, name string, value bool, usag
}
}

// DualFormatVar creates a flag which supports both dashes and underscores
func DualFormatVar(fs *pflag.FlagSet, val pflag.Value, name string, usage string) {
dashes := strings.Replace(name, "_", "-", -1)
underscores := strings.Replace(name, "-", "_", -1)

fs.Var(val, underscores, usage)
if dashes != underscores {
fs.Var(val, dashes, fmt.Sprintf("Synonym to -%s", underscores))
}
}

type Value[T any] interface {
pflag.Value
Get() T
}

// DurationOrIntVar implements pflag.Value for flags that have historically been
// of type IntVar (and then converted to seconds or some other unit) but are
// now transitioning to a proper DurationVar type.
Expand Down
61 changes: 47 additions & 14 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ import (
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// These constants represent values for various config parameters.
Expand Down Expand Up @@ -88,6 +90,24 @@ var (
txLogHandler = "/debug/txlog"
)

type TxThrottlerConfigFlag struct {
*throttlerdatapb.Configuration
}

func NewTxThrottlerConfigFlag() *TxThrottlerConfigFlag {
return &TxThrottlerConfigFlag{&throttlerdatapb.Configuration{}}
}

func (t *TxThrottlerConfigFlag) Get() *throttlerdatapb.Configuration {
return t.Configuration
}

func (t *TxThrottlerConfigFlag) Set(arg string) error {
return prototext.Unmarshal([]byte(arg), t.Configuration)
}

func (t *TxThrottlerConfigFlag) Type() string { return "string" }

// RegisterTabletEnvFlags is a public API to register tabletenv flags for use by test cases that expect
// some flags to be set with default values
func RegisterTabletEnvFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -143,7 +163,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
SecondsVar(fs, &currentConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.")
// Tx throttler config
flagutil.DualFormatBoolVar(fs, &currentConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.")
flagutil.DualFormatStringVar(fs, &currentConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
flagutil.DualFormatVar(fs, currentConfig.TxThrottlerConfig, "tx_throttler_config", "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
flagutil.DualFormatStringListVar(fs, &currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
fs.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information")
fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.")
Expand Down Expand Up @@ -316,7 +336,7 @@ type TabletConfig struct {
TwoPCAbandonAge Seconds `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"`
Expand Down Expand Up @@ -470,9 +490,6 @@ func (c *TabletConfig) Verify() error {
if v := c.HotRowProtection.MaxConcurrency; v <= 0 {
return fmt.Errorf("-hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return fmt.Errorf("--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}
return nil
}

Expand Down Expand Up @@ -508,6 +525,22 @@ func (c *TabletConfig) verifyTransactionLimitConfig() error {

// verifyTxThrottlerConfig checks the TxThrottler related config for sanity.
func (c *TabletConfig) verifyTxThrottlerConfig() error {
if !c.EnableTxThrottler {
return nil
}

err := throttler.MaxReplicationLagModuleConfig{Configuration: c.TxThrottlerConfig.Get()}.Verify()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err)
}

if len(c.TxThrottlerHealthCheckCells) == 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}

if c.TxThrottlerTabletTypes == nil || len(*c.TxThrottlerTabletTypes) == 0 {
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "--tx-throttler-tablet-types must be defined when transaction throttler is enabled")
}
Expand All @@ -519,6 +552,7 @@ func (c *TabletConfig) verifyTxThrottlerConfig() error {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported tablet type %q", tabletType)
}
}

return nil
}

Expand Down Expand Up @@ -606,17 +640,16 @@ var defaultConfig = TabletConfig{
EnablePerWorkloadTableMetrics: false,
}

// defaultTxThrottlerConfig formats the default throttlerdata.Configuration
// object in text format. It uses the object returned by
// throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of its
// fields. It panics on error.
func defaultTxThrottlerConfig() string {
// defaultTxThrottlerConfig returns the default TxThrottlerConfigFlag object based on
// a throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of
// its fields. It panics on error.
func defaultTxThrottlerConfig() *TxThrottlerConfigFlag {
// Take throttler.DefaultMaxReplicationLagModuleConfig and override some fields.
config := throttler.DefaultMaxReplicationLagModuleConfig().Configuration
// TODO(erez): Make DefaultMaxReplicationLagModuleConfig() return a MaxReplicationLagSec of 10
// and remove this line.
config.MaxReplicationLagSec = 10
return prototext.Format(config)
return &TxThrottlerConfigFlag{config}
}

func defaultTransactionLimitConfig() TransactionLimitConfig {
Expand Down
148 changes: 126 additions & 22 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (

"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/dbconfigs"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/yaml2"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestConfigParse(t *testing.T) {
Expand Down Expand Up @@ -325,31 +327,133 @@ func TestFlags(t *testing.T) {
assert.Equal(t, want, currentConfig)
}

func TestVerifyTxThrottlerConfig(t *testing.T) {
func TestTxThrottlerConfigFlag(t *testing.T) {
f := NewTxThrottlerConfigFlag()
defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration

{
// default config (replica)
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.Equal(t, defaultMaxReplicationLagModuleConfig.String(), f.String())
assert.Equal(t, "string", f.Type())
}
{
// replica + rdonly (allowed)
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
defaultMaxReplicationLagModuleConfig.TargetReplicationLagSec = 5
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.NotNil(t, f.Get())
assert.Equal(t, int64(5), f.Get().TargetReplicationLagSec)
}
{
// no tablet types
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err))
assert.NotNil(t, f.Set("should not parse"))
}
{
// disallowed tablet type
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_DRAINED}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err))
}

func TestVerifyTxThrottlerConfig(t *testing.T) {
defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration
invalidMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration
invalidMaxReplicationLagModuleConfig.TargetReplicationLagSec = -1

type testConfig struct {
Name string
ExpectedErrorCode vtrpcpb.Code
//
EnableTxThrottler bool
TxThrottlerConfig *TxThrottlerConfigFlag
TxThrottlerHealthCheckCells []string
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag
TxThrottlerDefaultPriority int
}

tests := []testConfig{
{
// default (disabled)
Name: "default",
EnableTxThrottler: false,
},
{
// enabled with invalid throttler config
Name: "enabled invalid config",
ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{invalidMaxReplicationLagModuleConfig},
},
{
// enabled without cells defined
Name: "enabled without cells",
ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
},
{
// enabled with good config (default/replica tablet type)
Name: "enabled",
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerHealthCheckCells: []string{"cell1"},
},
{
// enabled + replica and rdonly tablet types
Name: "enabled plus rdonly",
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerHealthCheckCells: []string{"cell1"},
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
},
},
{
// enabled without tablet types
Name: "enabled without tablet types",
ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerHealthCheckCells: []string{"cell1"},
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{},
},
{
// enabled + disallowed tablet type
Name: "enabled disallowed tablet type",
ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerHealthCheckCells: []string{"cell1"},
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_DRAINED},
},
{
// enabled + disallowed priority
Name: "enabled disallowed priority",
ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerDefaultPriority: 12345,
TxThrottlerHealthCheckCells: []string{"cell1"},
},
}

for _, test := range tests {
test := test
t.Run(test.Name, func(t *testing.T) {
t.Parallel()

config := defaultConfig
config.EnableTxThrottler = test.EnableTxThrottler
if test.TxThrottlerConfig == nil {
test.TxThrottlerConfig = NewTxThrottlerConfigFlag()
}
config.TxThrottlerConfig = test.TxThrottlerConfig
config.TxThrottlerHealthCheckCells = test.TxThrottlerHealthCheckCells
config.TxThrottlerDefaultPriority = test.TxThrottlerDefaultPriority
if test.TxThrottlerTabletTypes != nil {
config.TxThrottlerTabletTypes = test.TxThrottlerTabletTypes
}

err := config.verifyTxThrottlerConfig()
if test.ExpectedErrorCode == vtrpcpb.Code_OK {
assert.Nil(t, err)
} else {
assert.NotNil(t, err)
assert.Equal(t, test.ExpectedErrorCode, vterrors.Code(err))
}
})
}
}
Loading

0 comments on commit 45c854b

Please sign in to comment.