From 19e2021354337f555b2ffbae0af98d5de2baf27d Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 5 Jun 2023 18:59:59 +0300 Subject: [PATCH] Tablet throttler: throttler-config-via-topo defaults 'true', deprecation message for old flags (#13130) * Table throttler: --throttler-config-via-topo now defaults to 'true' Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * add deprecation message Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * endtoend tests: remove '--enable-lag-throttler' and use 'UpdateThrottlerConfig' everywhere Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * always use vtctldclient Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * use cluster.VtctldClientProcess Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * disable --throttler-config-via-topo in old throttler tests Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * Remove --throttler-config-via-topo where used, since it now defaults 'true' Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * fix vreplication cluster setup, waiting for throttler config to apply Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * changelog Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * extend throttler threshold Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * a bit more verbose Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * fixed CLI test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * remove old '--enable-lag-throttler' flag, introduce '--heartbeat_on_demand_duration' Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * more log info in throttler.Open() Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * more logging Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * Revert to --heartbeat_enable Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * Protect throttler config change application with initMutex And in e2e test update the throttler config on the keyspace when it's created. Only wait for the new tablets in a shard to have the throttler enabled when adding a Shard. Signed-off-by: Matt Lord * More CI testing Signed-off-by: Matt Lord * CI testing cont Signed-off-by: Matt Lord * Yes... Signed-off-by: Matt Lord * Somebody doesn't like force pushes so msg here Signed-off-by: Matt Lord * Increase on-demand heartbeat duration from 10s to 1m Signed-off-by: Matt Lord * Use only on-demand heartbeats everywhere Signed-off-by: Matt Lord * Use same throttler config everywhere Signed-off-by: Matt Lord * Update all keyspaces and don't fail test on missing JSON keys Signed-off-by: Matt Lord * Use constant heartbeats in vrepl e2e tests Until https://github.com/vitessio/vitess/issues/13175 is fixed. Signed-off-by: Matt Lord * Increase workflow command timeout Signed-off-by: Matt Lord * Don't wait for throttler on non-serving primaries Signed-off-by: Matt Lord * https://github.com/vitessio/vitess/issues/13175 is fixed, therefore re-instating on-deman heartbeats Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * Added ToC Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * Tweak comment and kick CI Signed-off-by: Matt Lord * Treat isOpen as the ready/running signal. Also align all initMutex usage. Signed-off-by: Matt Lord * Re-adjust comment Signed-off-by: Matt Lord * Adjust CheckIsReady() to match OnlineDDL's expectation/usage This was only using IsReady() before, now it's using IsOpen() and IsReady(). Signed-off-by: Matt Lord * Get rid of log messages from SrvKeyspaceWatcher when no node/key Signed-off-by: Matt Lord * More corrections/tweaks Signed-off-by: Matt Lord * Use more convenient/clear new IsRunning function Signed-off-by: Matt Lord * Revert "Use more convenient/clear new IsRunning function" This reverts commit 9aef27655cabd6f54784a99283ecfd028df6143e as this change was not correct. Signed-off-by: Matt Lord * Further fix correct use of IsOpen(), IsRunning(), IsEnabled() Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * throttler.throttledApps cannot be nil Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * minor refactory/beautify for test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * fix flakiness of tabletmanager_throttler_topo test by: (1) proper wait-for functions, and (2) issue different queries per goroutine Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * Fix typo in release notes Signed-off-by: Matt Lord --------- Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Signed-off-by: Matt Lord Co-authored-by: Matt Lord --- changelog/17.0/17.0.0/summary.md | 7 ++ go/flags/endtoend/vttablet.txt | 2 +- .../onlineddl/ghost/onlineddl_ghost_test.go | 7 +- .../onlineddl/revert/onlineddl_revert_test.go | 6 +- .../scheduler/onlineddl_scheduler_test.go | 7 +- .../onlineddl/vrepl/onlineddl_vrepl_test.go | 4 +- .../onlineddl_vrepl_mini_stress_test.go | 6 +- .../onlineddl_vrepl_stress_suite_test.go | 6 +- .../vrepl_suite/onlineddl_vrepl_suite_test.go | 6 +- .../vrepl/schemadiff_vrepl_suite_test.go | 7 +- .../tabletmanager/tablegc/tablegc_test.go | 1 - .../tabletmanager/throttler/throttler_test.go | 2 +- .../throttler_custom_config/throttler_test.go | 2 +- .../throttler_topo/throttler_test.go | 70 ++++++------ go/test/endtoend/throttler/util.go | 106 ++++++++++-------- go/test/endtoend/vreplication/cluster_test.go | 32 +++++- .../resharding_workflows_v2_test.go | 1 + go/vt/srvtopo/watch.go | 2 +- go/vt/vttablet/onlineddl/executor.go | 24 ++-- go/vt/vttablet/tabletserver/query_executor.go | 4 +- .../tabletserver/throttle/throttler.go | 86 ++++++++------ 21 files changed, 226 insertions(+), 162 deletions(-) diff --git a/changelog/17.0/17.0.0/summary.md b/changelog/17.0/17.0.0/summary.md index 306a787fa71..252a27e2699 100644 --- a/changelog/17.0/17.0.0/summary.md +++ b/changelog/17.0/17.0.0/summary.md @@ -14,6 +14,7 @@ - **[New command line flags and behavior](#new-flag)** - [Builtin backup: read buffering flags](#builtin-backup-read-buffering-flags) - [Manifest backup external decompressor command](#manifest-backup-external-decompressor-command) + - [Throttler config via topo enabled by default](#throttler-config-via-topo) - **[New stats](#new-stats)** - [Detailed backup and restore stats](#detailed-backup-and-restore-stats) - [VTtablet Error count with code](#vttablet-error-count-with-code) @@ -164,6 +165,12 @@ This feature enables the following flow: ``` 2. Restore that backup with a mere `Restore` command, without having to specify `--external-decompressor`. +#### vttablet --throttler-config-via-topo + +This flag was introduced in v16 and defaulted to `false`. In v17 it defaults to `true`, and there is no need to supply it. + +Note that this flag overrides `--enable-lag-throttler` and `--throttle-threshold`, which now give warnings, and will be removed in v18. + ### New stats #### Detailed backup and restore stats diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 6a4e858a711..f7adc1292a8 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -316,7 +316,7 @@ Usage of vttablet: --throttle_metrics_threshold float Override default throttle threshold, respective to --throttle_metrics_query (default 1.7976931348623157e+308) --throttle_tablet_types string Comma separated VTTablet types to be considered by the throttler. default: 'replica'. example: 'replica,rdonly'. 'replica' aways implicitly included (default "replica") --throttle_threshold duration Replication lag threshold for default lag throttling (default 1s) - --throttler-config-via-topo When 'true', read config from topo service and ignore throttle_threshold, throttle_metrics_threshold, throttle_metrics_query, throttle_check_as_check_self + --throttler-config-via-topo When 'true', read config from topo service and ignore throttle_threshold, throttle_metrics_threshold, throttle_metrics_query, throttle_check_as_check_self (default true) --topo_consul_lock_delay duration LockDelay for consul session. (default 15s) --topo_consul_lock_session_checks string List of checks for consul session. (default "serfHealth") --topo_consul_lock_session_ttl string TTL for consul session. diff --git a/go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go b/go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go index c5ad11943cf..3dc635c8870 100644 --- a/go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go +++ b/go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -166,9 +167,6 @@ func TestMain(m *testing.M) { } clusterInstance.VtTabletExtraArgs = []string{ - "--enable-lag-throttler", - "--throttle_threshold", "1s", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", @@ -218,6 +216,9 @@ func TestSchemaChange(t *testing.T) { defer cluster.PanicHandler(t) shards = clusterInstance.Keyspaces[0].Shards assert.Equal(t, 2, len(shards)) + + throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second) + testWithInitialSchema(t) t.Run("create non_online", func(t *testing.T) { _ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online", "") diff --git a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go index 6c90764b931..dd0b6d84a53 100644 --- a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go +++ b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -150,9 +151,6 @@ func TestMain(m *testing.M) { } clusterInstance.VtTabletExtraArgs = []string{ - "--enable-lag-throttler", - "--throttle_threshold", "1s", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", @@ -205,6 +203,8 @@ func TestSchemaChange(t *testing.T) { shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) + throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second) + t.Run("revertible", testRevertible) t.Run("revert", testRevert) } diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index c2eb46cf579..c9c60af2d85 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -187,9 +188,6 @@ func TestMain(m *testing.M) { } clusterInstance.VtTabletExtraArgs = []string{ - "--enable-lag-throttler", - "--throttle_threshold", "1s", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--watch_replication_stream", @@ -234,6 +232,9 @@ func TestMain(m *testing.M) { } func TestSchemaChange(t *testing.T) { + + throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second) + t.Run("scheduler", testScheduler) t.Run("singleton", testSingleton) t.Run("declarative", testDeclarative) diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index be88fc4618b..b012cd4f074 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -171,8 +171,6 @@ func TestMain(m *testing.M) { } clusterInstance.VtTabletExtraArgs = []string{ - "--throttler-config-via-topo", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", @@ -259,7 +257,7 @@ func TestSchemaChange(t *testing.T) { err := clusterInstance.WaitForTabletsToHealthyInVtgate() require.NoError(t, err) - _, err = throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", false) + _, err = throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "") require.NoError(t, err) for _, ks := range clusterInstance.Keyspaces { diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 0039361a37f..1f1f3b9c5b7 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -177,9 +178,6 @@ func TestMain(m *testing.M) { } clusterInstance.VtTabletExtraArgs = []string{ - "--enable-lag-throttler", - "--throttle_threshold", "1s", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", @@ -232,6 +230,8 @@ func TestSchemaChange(t *testing.T) { shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) + throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second) + t.Run("create schema", func(t *testing.T) { assert.Equal(t, 1, len(clusterInstance.Keyspaces[0].Shards)) testWithInitialSchema(t) diff --git a/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go b/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go index 4c7965d1109..92fb7cf13e5 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go @@ -47,6 +47,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -429,9 +430,6 @@ func TestMain(m *testing.M) { // thereby examining lastPK on vcopier side. We will be iterating tables using non-PK order throughout // this test suite, and so the low setting ensures we hit the more interesting code paths. clusterInstance.VtTabletExtraArgs = []string{ - "--enable-lag-throttler", - "--throttle_threshold", "1s", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", @@ -485,6 +483,8 @@ func TestSchemaChange(t *testing.T) { shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) + throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second) + for _, testcase := range testCases { require.NotEmpty(t, testcase.name) t.Run(testcase.name, func(t *testing.T) { diff --git a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go index 7e655b4b868..c8b87215036 100644 --- a/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go +++ b/go/test/endtoend/onlineddl/vrepl_suite/onlineddl_vrepl_suite_test.go @@ -34,6 +34,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -82,9 +83,6 @@ func TestMain(m *testing.M) { } clusterInstance.VtTabletExtraArgs = []string{ - "--enable-lag-throttler", - "--throttle_threshold", "1s", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", @@ -134,6 +132,8 @@ func TestSchemaChange(t *testing.T) { shards := clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) + throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second) + files, err := os.ReadDir(testDataPath) require.NoError(t, err) for _, f := range files { diff --git a/go/test/endtoend/schemadiff/vrepl/schemadiff_vrepl_suite_test.go b/go/test/endtoend/schemadiff/vrepl/schemadiff_vrepl_suite_test.go index 12491f1f152..2dc79840018 100644 --- a/go/test/endtoend/schemadiff/vrepl/schemadiff_vrepl_suite_test.go +++ b/go/test/endtoend/schemadiff/vrepl/schemadiff_vrepl_suite_test.go @@ -25,6 +25,7 @@ import ( "regexp" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,6 +34,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/schemadiff" "vitess.io/vitess/go/vt/sqlparser" ) @@ -87,9 +89,6 @@ func TestMain(m *testing.M) { } clusterInstance.VtTabletExtraArgs = []string{ - "--enable-lag-throttler", - "--throttle_threshold", "1s", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", @@ -139,6 +138,8 @@ func TestSchemaChange(t *testing.T) { shards := clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) + throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second) + files, err := os.ReadDir(testDataPath) require.NoError(t, err) for _, f := range files { diff --git a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go index eca52cbb106..1b43ecf2d90 100644 --- a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go +++ b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go @@ -97,7 +97,6 @@ func TestMain(m *testing.M) { "--lock_tables_timeout", "5s", "--watch_replication_stream", "--enable_replication_reporter", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--gc_check_interval", gcCheckInterval.String(), "--gc_purge_check_interval", gcPurgeCheckInterval.String(), diff --git a/go/test/endtoend/tabletmanager/throttler/throttler_test.go b/go/test/endtoend/tabletmanager/throttler/throttler_test.go index 28d0c287c24..5ca4bc32a87 100644 --- a/go/test/endtoend/tabletmanager/throttler/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler/throttler_test.go @@ -96,12 +96,12 @@ func TestMain(m *testing.M) { // Set extra tablet args for lock timeout clusterInstance.VtTabletExtraArgs = []string{ + "--throttler-config-via-topo=false", "--lock_tables_timeout", "5s", "--watch_replication_stream", "--enable_replication_reporter", "--enable-lag-throttler", "--throttle_threshold", throttlerThreshold.String(), - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", onDemandHeartbeatDuration.String(), "--disable_active_reparents", diff --git a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go index d7968ebb4e0..e173384eb62 100644 --- a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go @@ -98,6 +98,7 @@ func TestMain(m *testing.M) { // Set extra tablet args for lock timeout clusterInstance.VtTabletExtraArgs = []string{ + "--throttler-config-via-topo=false", "--lock_tables_timeout", "5s", "--watch_replication_stream", "--enable_replication_reporter", @@ -105,7 +106,6 @@ func TestMain(m *testing.M) { "--throttle_metrics_query", "show global status like 'threads_running'", "--throttle_metrics_threshold", fmt.Sprintf("%d", testThreshold), "--throttle_check_as_check_self", - "--heartbeat_enable", "--heartbeat_interval", "250ms", } diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index a3204e0be2f..654870fae97 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -39,7 +39,7 @@ import ( const ( customQuery = "show global status like 'threads_running'" - customThreshold = 5 * time.Second + customThreshold = 5 unreasonablyLowThreshold = 1 * time.Millisecond extremelyHighThreshold = 1 * time.Hour onDemandHeartbeatDuration = 5 * time.Second @@ -112,8 +112,6 @@ func TestMain(m *testing.M) { "--lock_tables_timeout", "5s", "--watch_replication_stream", "--enable_replication_reporter", - "--throttler-config-via-topo", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", onDemandHeartbeatDuration.String(), "--disable_active_reparents", @@ -247,7 +245,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("enabling throttler with very low threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery, false) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere with the new config. @@ -259,7 +257,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) t.Run("disabling throttler", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery, false) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery) assert.NoError(t, err) // Wait for the throttler to be disabled everywhere. @@ -273,7 +271,7 @@ func TestInitialThrottler(t *testing.T) { t.Run("enabling throttler, again", func(t *testing.T) { // Enable the throttler again with the default query which also moves us back // to the default threshold. - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery, true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere again with the default config. @@ -285,7 +283,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) }) t.Run("setting high threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery, true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere with new config. @@ -297,7 +295,7 @@ func TestInitialThrottler(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) }) t.Run("setting low threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, true) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere with new config. @@ -438,15 +436,21 @@ func TestCustomQuery(t *testing.T) { defer cluster.PanicHandler(t) t.Run("enabling throttler with custom query and threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold.Seconds(), customQuery, false) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, customQuery) assert.NoError(t, err) // Wait for the throttler to be enabled everywhere with new custom config. - for _, tablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets { - throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: customQuery, Threshold: customThreshold.Seconds()}, throttlerEnabledTimeout) + expectConfig := &throttler.Config{Query: customQuery, Threshold: customThreshold} + for _, ks := range clusterInstance.Keyspaces { + for _, shard := range ks.Shards { + for _, tablet := range shard.Vttablets { + throttler.WaitForThrottlerStatusEnabled(t, tablet, true, expectConfig, throttlerEnabledTimeout) + } + } } }) t.Run("validating OK response from throttler with custom query", func(t *testing.T) { + throttler.WaitForValidData(t, primaryTablet, throttlerEnabledTimeout) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() @@ -455,28 +459,24 @@ func TestCustomQuery(t *testing.T) { t.Run("test threads running", func(t *testing.T) { sleepDuration := 20 * time.Second var wg sync.WaitGroup - for i := 0; i < int(customThreshold.Seconds()); i++ { - // Generate different Sleep() calls, all at minimum sleepDuration. - wg.Add(1) - go func(i int) { - defer wg.Done() - vtgateExec(t, fmt.Sprintf("select sleep(%d)", int(sleepDuration.Seconds())+i), "") - }(i) - } + t.Run("generate running queries", func(t *testing.T) { + for i := 0; i < customThreshold+1; i++ { + // Generate different Sleep() calls, all at minimum sleepDuration. + wg.Add(1) + go func(i int) { + defer wg.Done() + // Make sure to generate a different query in each goroutine, so that vtgate does not oversmart us + // and optimizes connections/caching. + query := fmt.Sprintf("select sleep(%d) + %d", int(sleepDuration.Seconds()), i) + vtgateExec(t, query, "") + }(i) + } + }) t.Run("exceeds threshold", func(t *testing.T) { - throttler.WaitForQueryResult(t, primaryTablet, - "select if(variable_value > 5, 'true', 'false') as result from performance_schema.global_status where variable_name='threads_running'", - "true", sleepDuration/3) - throttler.WaitForValidData(t, primaryTablet, sleepDuration-(5*time.Second)) - // Now we should be reporting ~ customThreshold*2 threads_running, and we should + // Now we should be reporting ~ customThreshold+1 threads_running, and we should // hit the threshold. For example: // {"StatusCode":429,"Value":6,"Threshold":5,"Message":"Threshold exceeded"} - { - resp, err := throttleCheck(primaryTablet, false) - require.NoError(t, err) - defer resp.Body.Close() - assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) - } + waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests) { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) @@ -486,15 +486,9 @@ func TestCustomQuery(t *testing.T) { }) t.Run("wait for queries to terminate", func(t *testing.T) { wg.Wait() - time.Sleep(1 * time.Second) // graceful time to let throttler read metrics }) t.Run("restored below threshold", func(t *testing.T) { - { - resp, err := throttleCheck(primaryTablet, false) - require.NoError(t, err) - defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) - } + waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK) { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) @@ -510,7 +504,7 @@ func TestRestoreDefaultQuery(t *testing.T) { // Validate going back from custom-query to default-query (replication lag) still works. t.Run("enabling throttler with default query and threshold", func(t *testing.T) { - _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, false) + _, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery) assert.NoError(t, err) // Wait for the throttler to be up and running everywhere again with the default config. diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index e8769999fc1..c6a7b2f69a5 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -21,13 +21,15 @@ import ( "fmt" "io" "net/http" + "strings" "testing" "time" - "github.com/buger/jsonparser" "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" ) @@ -51,13 +53,8 @@ var DefaultConfig = &Config{ // This retries the command until it succeeds or times out as the // SrvKeyspace record may not yet exist for a newly created // Keyspace that is still initializing before it becomes serving. -func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string, viaVtctldClient bool) (result string, err error) { +func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, keyspaceName string, enable bool, disable bool, threshold float64, metricsQuery string) (result string, err error) { args := []string{} - clientfunc := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput - if !viaVtctldClient { - args = append(args, "--") - clientfunc = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput - } args = append(args, "UpdateThrottlerConfig") if enable { args = append(args, "--enable") @@ -74,7 +71,7 @@ func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, ena } else { args = append(args, "--check-as-check-shard") } - args = append(args, clusterInstance.Keyspaces[0].Name) + args = append(args, keyspaceName) ctx, cancel := context.WithTimeout(context.Background(), ConfigTimeout) defer cancel() @@ -83,7 +80,7 @@ func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, ena defer ticker.Stop() for { - result, err = clientfunc(args...) + result, err = vtctldProcess.ExecuteCommandWithOutput(args...) if err == nil { return result, nil } @@ -95,44 +92,91 @@ func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, ena } } +// UpdateThrottlerTopoConfig runs vtctlclient UpdateThrottlerConfig. +// This retries the command until it succeeds or times out as the +// SrvKeyspace record may not yet exist for a newly created +// Keyspace that is still initializing before it becomes serving. +func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string) (string, error) { + rec := concurrency.AllErrorRecorder{} + var ( + err error + res strings.Builder + ) + for _, ks := range clusterInstance.Keyspaces { + ires, err := UpdateThrottlerTopoConfigRaw(&clusterInstance.VtctldClientProcess, ks.Name, enable, disable, threshold, metricsQuery) + if err != nil { + rec.RecordError(err) + } + res.WriteString(ires) + } + if rec.HasErrors() { + err = rec.Error() + } + return res.String(), err +} + // WaitForThrottlerStatusEnabled waits for a tablet to report its throttler status as // enabled/disabled and have the provided config (if any) until the specified timeout. func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, enabled bool, config *Config, timeout time.Duration) { enabledJSONPath := "IsEnabled" queryJSONPath := "Query" thresholdJSONPath := "Threshold" - url := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort) + throttlerURL := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort) + tabletURL := fmt.Sprintf("http://localhost:%d/debug/status_details", tablet.HTTPPort) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() ticker := time.NewTicker(time.Second) defer ticker.Stop() for { - body := getHTTPBody(url) - isEnabled, err := jsonparser.GetBoolean([]byte(body), enabledJSONPath) - require.NoError(t, err) + throttlerBody := getHTTPBody(throttlerURL) + isEnabled := gjson.Get(throttlerBody, enabledJSONPath).Bool() if isEnabled == enabled { if config == nil { return } - query, err := jsonparser.GetString([]byte(body), queryJSONPath) - require.NoError(t, err) - threshold, err := jsonparser.GetFloat([]byte(body), thresholdJSONPath) - require.NoError(t, err) + query := gjson.Get(throttlerBody, queryJSONPath).String() + threshold := gjson.Get(throttlerBody, thresholdJSONPath).Float() if query == config.Query && threshold == config.Threshold { return } } + // If the tablet is Not Serving due to e.g. being involved in a + // Reshard where its QueryService is explicitly disabled, then + // we should not fail the test as the throttler will not be Open. + tabletBody := getHTTPBody(tabletURL) + class := strings.ToLower(gjson.Get(tabletBody, "0.Class").String()) + value := strings.ToLower(gjson.Get(tabletBody, "0.Value").String()) + if class == "unhappy" && strings.Contains(value, "not serving") { + log.Infof("tablet %s is Not Serving, so ignoring throttler status as the throttler will not be Opened", tablet.Alias) + return + } select { case <-ctx.Done(): t.Errorf("timed out waiting for the %s tablet's throttler status enabled to be %t with the correct config after %v; last seen value: %s", - tablet.Alias, enabled, timeout, body) + tablet.Alias, enabled, timeout, throttlerBody) return case <-ticker.C: } } } +// EnableLagThrottlerAndWaitForStatus is a utility function to enable the throttler at the beginning of an endtoend test. +// The throttler is configued to use the standard replication lag metric. The function waits until the throttler is confirmed +// to be running on all tablets. +func EnableLagThrottlerAndWaitForStatus(t *testing.T, clusterInstance *cluster.LocalProcessCluster, lag time.Duration) { + _, err := UpdateThrottlerTopoConfig(clusterInstance, true, false, lag.Seconds(), "") + require.NoError(t, err) + + for _, ks := range clusterInstance.Keyspaces { + for _, shard := range ks.Shards { + for _, tablet := range shard.Vttablets { + WaitForThrottlerStatusEnabled(t, tablet, true, nil, time.Minute) + } + } + } +} + func getHTTPBody(url string) string { resp, err := http.Get(url) if err != nil { @@ -149,32 +193,6 @@ func getHTTPBody(url string) string { return body } -// WaitForQueryResult waits for a tablet to return the given result for the given -// query until the specified timeout. -// This is for simple queries that return 1 column in 1 row. It compares the result -// for that column as a string with the provided result. -func WaitForQueryResult(t *testing.T, tablet *cluster.Vttablet, query, result string, timeout time.Duration) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - res, err := tablet.VttabletProcess.QueryTablet(query, "", false) - require.NoError(t, err) - if res != nil && len(res.Rows) == 1 && res.Rows[0][0].ToString() == result { - return - } - select { - case <-ctx.Done(): - t.Errorf("timed out waiting for the %q query to produce a result of %q on tablet %s after %v; last seen value: %s", - query, result, tablet.Alias, timeout, res.Rows[0][0].ToString()) - return - case <-ticker.C: - } - } -} - // WaitForValidData waits for a tablet's checks to return a non 500 http response // which indicates that it's not able to provide valid results. This is most // commonly caused by the throttler still gathering the initial results for diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 046e0736698..12539b778de 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -37,6 +37,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/log" ) @@ -57,6 +58,8 @@ var ( extraVTTabletArgs = []string{} parallelInsertWorkers = "--vreplication-parallel-insert-workers=4" + + throttlerConfig = throttler.Config{Threshold: 15} ) // ClusterConfig defines the parameters like ports, tmpDir, tablet types which uniquely define a vitess cluster @@ -364,6 +367,11 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string, if err := vc.VtctldClient.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName); err != nil { t.Fatalf(err.Error()) } + + log.Infof("Applying throttler config for keyspace %s", keyspace.Name) + res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, keyspace.Name, true, false, throttlerConfig.Threshold, throttlerConfig.Query) + require.NoError(t, err, res) + cellsToWatch := "" for i, cell := range cells { if i > 0 { @@ -392,7 +400,9 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string, vc.StartVtgate(t, cell, cellsToWatch) } } - _ = vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", ksName) + + err = vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", ksName) + require.NoError(t, err) return keyspace, nil } @@ -402,8 +412,7 @@ func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace, options := []string{ "--queryserver-config-schema-reload-time", "5", - "--enable-lag-throttler", - "--heartbeat_enable", + "--heartbeat_on_demand_duration", "5s", "--heartbeat_interval", "250ms", } // FIXME: for multi-cell initial schema doesn't seem to load without "--queryserver-config-schema-reload-time" options = append(options, extraVTTabletArgs...) @@ -537,9 +546,26 @@ func (vc *VitessCluster) AddShards(t *testing.T, cells []*Cell, keyspace *Keyspa require.NotEqual(t, 0, primaryTabletUID, "Should have created a primary tablet") log.Infof("InitializeShard and make %d primary", primaryTabletUID) require.NoError(t, vc.VtctlClient.InitializeShard(keyspace.Name, shardName, cells[0].Name, primaryTabletUID)) + log.Infof("Finished creating shard %s", shard.Name) } + err := vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", keyspace.Name) + require.NoError(t, err) + + log.Infof("Waiting for throttler config to be applied on all shards") + for _, shard := range keyspace.Shards { + for _, tablet := range shard.Tablets { + clusterTablet := &cluster.Vttablet{ + Alias: tablet.Name, + HTTPPort: tablet.Vttablet.Port, + } + log.Infof("+ Waiting for throttler config to be applied on %s, type=%v", tablet.Name, tablet.Vttablet.TabletType) + throttler.WaitForThrottlerStatusEnabled(t, clusterTablet, true, nil, time.Minute) + } + } + log.Infof("Throttler config applied on all shards") + return nil } diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index cdf9d18523a..e12dbfa1cb1 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -125,6 +125,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, if tabletTypes != "" { args = append(args, "--tablet_types", tabletTypes) } + args = append(args, "--timeout", time.Minute.String()) ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) args = append(args, action, ksWorkflow) output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...) diff --git a/go/vt/srvtopo/watch.go b/go/vt/srvtopo/watch.go index 2d571f4930f..2d470327c4e 100644 --- a/go/vt/srvtopo/watch.go +++ b/go/vt/srvtopo/watch.go @@ -195,7 +195,7 @@ func (entry *watchEntry) onErrorLocked(err error, init bool) { // This watcher will able to continue to return the last value till it is not able to connect to the topo server even if the cache TTL is reached. // TTL cache is only checked if the error is a known error i.e topo.Error. _, isTopoErr := err.(topo.Error) - if isTopoErr && time.Since(entry.lastValueTime) > entry.rw.cacheTTL { + if entry.value != nil && isTopoErr && time.Since(entry.lastValueTime) > entry.rw.cacheTTL { log.Errorf("WatchSrvKeyspace clearing cached entry for %v", entry.key) entry.value = nil } diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 9c2981fd861..5f6899c6e1d 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -2131,7 +2131,7 @@ func (e *Executor) ThrottleMigration(ctx context.Context, uuid string, expireStr if err != nil { return nil, err } - if err := e.lagThrottler.CheckIsReady(); err != nil { + if err := e.lagThrottler.CheckIsOpen(); err != nil { return nil, err } _ = e.lagThrottler.ThrottleApp(uuid, time.Now().Add(duration), ratio) @@ -2144,7 +2144,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin if err != nil { return nil, err } - if err := e.lagThrottler.CheckIsReady(); err != nil { + if err := e.lagThrottler.CheckIsOpen(); err != nil { return nil, err } _ = e.lagThrottler.ThrottleApp(throttlerapp.OnlineDDLName.String(), time.Now().Add(duration), ratio) @@ -2153,7 +2153,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin // UnthrottleMigration func (e *Executor) UnthrottleMigration(ctx context.Context, uuid string) (result *sqltypes.Result, err error) { - if err := e.lagThrottler.CheckIsReady(); err != nil { + if err := e.lagThrottler.CheckIsOpen(); err != nil { return nil, err } defer e.triggerNextCheckInterval() @@ -2163,7 +2163,7 @@ func (e *Executor) UnthrottleMigration(ctx context.Context, uuid string) (result // UnthrottleAllMigrations func (e *Executor) UnthrottleAllMigrations(ctx context.Context) (result *sqltypes.Result, err error) { - if err := e.lagThrottler.CheckIsReady(); err != nil { + if err := e.lagThrottler.CheckIsOpen(); err != nil { return nil, err } defer e.triggerNextCheckInterval() @@ -3472,13 +3472,12 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i } var currentUserThrottleRatio float64 - if err := e.lagThrottler.CheckIsReady(); err == nil { - // No point in reviewing throttler info if it's not enabled&open - for _, app := range e.lagThrottler.ThrottledApps() { - if throttlerapp.OnlineDDLName.Equals(app.AppName) { - currentUserThrottleRatio = app.Ratio - break - } + + // No point in reviewing throttler info if it's not enabled&open + for _, app := range e.lagThrottler.ThrottledApps() { + if throttlerapp.OnlineDDLName.Equals(app.AppName) { + currentUserThrottleRatio = app.Ratio + break } } @@ -3593,7 +3592,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i } } go throttlerOnce.Do(func() { - if e.lagThrottler.CheckIsReady() != nil { + if !e.lagThrottler.IsRunning() { return } // Self healing: in the following scenario: @@ -3611,6 +3610,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i // on-demand heartbeats, unlocking the deadlock. e.lagThrottler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite) }) + } } case schema.DDLStrategyPTOSC: diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 44f9db06256..c55fb3ac6b1 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -968,7 +968,7 @@ func (qre *QueryExecutor) execShowMigrationLogs() (*sqltypes.Result, error) { } func (qre *QueryExecutor) execShowThrottledApps() (*sqltypes.Result, error) { - if err := qre.tsv.lagThrottler.CheckIsReady(); err != nil { + if err := qre.tsv.lagThrottler.CheckIsOpen(); err != nil { return nil, err } if _, ok := qre.plan.FullStmt.(*sqlparser.ShowThrottledApps); !ok { @@ -1007,7 +1007,7 @@ func (qre *QueryExecutor) execShowThrottlerStatus() (*sqltypes.Result, error) { return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "Expecting SHOW VITESS_THROTTLER STATUS plan") } var enabled int32 - if err := qre.tsv.lagThrottler.CheckIsReady(); err == nil { + if qre.tsv.lagThrottler.IsEnabled() { enabled = 1 } result := &sqltypes.Result{ diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 83cb28ae849..917ff5a7eb4 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -72,7 +72,7 @@ var ( throttleMetricQuery string throttleMetricThreshold = math.MaxFloat64 throttlerCheckAsCheckSelf = false - throttlerConfigViaTopo = false + throttlerConfigViaTopo = true ) func init() { @@ -91,7 +91,7 @@ func registerThrottlerFlags(fs *pflag.FlagSet) { } var ( - ErrThrottlerNotReady = errors.New("throttler not enabled/ready") + ErrThrottlerNotOpen = errors.New("throttler not open") ) // ThrottleCheckType allows a client to indicate what type of check it wants to issue. See available types below. @@ -222,15 +222,6 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv return throttler } -// CheckIsReady checks if this throttler is ready to serve. If not, it returns an error -func (throttler *Throttler) CheckIsReady() error { - if throttler.IsEnabled() { - // all good - return nil - } - return ErrThrottlerNotReady -} - func (throttler *Throttler) StoreMetricsThreshold(threshold float64) { throttler.MetricsThreshold.Store(math.Float64bits(threshold)) } @@ -325,22 +316,25 @@ func (throttler *Throttler) WatchSrvKeyspaceCallback(srvks *topodatapb.SrvKeyspa throttlerConfig := throttler.normalizeThrottlerConfig(srvks.ThrottlerConfig) if throttler.IsEnabled() { - // Throttler is running and we should apply the config change through Operate() - // or else we get into race conditions. + // Throttler is enabled and we should apply the config change + // through Operate() or else we get into race conditions. go func() { log.Infof("Throttler: submitting a throttler config apply message with: %+v", throttlerConfig) throttler.throttlerConfigChan <- throttlerConfig }() } else { - // throttler is not running, we should apply directly + throttler.initMutex.Lock() + defer throttler.initMutex.Unlock() + // Throttler is not enabled, we should apply directly. throttler.applyThrottlerConfig(context.Background(), throttlerConfig) } return true } -// applyThrottlerConfig receives a Throttlerconfig as read from SrvKeyspace, and applies the configuration. This may cause -// the throttler to be enabled/disabled, and of course it affects the throttling query/threshold. +// applyThrottlerConfig receives a Throttlerconfig as read from SrvKeyspace, and applies the configuration. +// This may cause the throttler to be enabled/disabled, and of course it affects the throttling query/threshold. +// Note: you should be holding the initMutex when calling this function. func (throttler *Throttler) applyThrottlerConfig(ctx context.Context, throttlerConfig *topodatapb.ThrottlerConfig) { if !throttlerConfigViaTopo { return @@ -364,6 +358,24 @@ func (throttler *Throttler) IsEnabled() bool { return atomic.LoadInt64(&throttler.isEnabled) > 0 } +func (throttler *Throttler) IsOpen() bool { + return atomic.LoadInt64(&throttler.isOpen) > 0 +} + +// CheckIsOpen checks if this throttler is ready to serve. If not, it +// returns an error. +func (throttler *Throttler) CheckIsOpen() error { + if throttler.IsOpen() { + // all good + return nil + } + return ErrThrottlerNotOpen +} + +func (throttler *Throttler) IsRunning() bool { + return throttler.IsOpen() && throttler.IsEnabled() +} + // Enable activates the throttler probes; when enabled, the throttler responds to check queries based on // the collected metrics. func (throttler *Throttler) Enable(ctx context.Context) bool { @@ -412,10 +424,14 @@ func (throttler *Throttler) Disable(ctx context.Context) bool { // Open opens database pool and initializes the schema func (throttler *Throttler) Open() error { + // TODO: remove `EnableLagThrottler` in v18 + if throttler.env.Config().EnableLagThrottler { + log.Warningf("The flags `--enable_lag_throttler` and `--throttle_threshold` will be removed in v18. Use 'vtctl UpdateThrottlerConfig', see https://vitess.io/docs/17.0/reference/programs/vtctldclient/vtctldclient_updatethrottlerconfig/") + } log.Infof("Throttler: started execution of Open. Acquiring initMutex lock") throttler.initMutex.Lock() defer throttler.initMutex.Unlock() - if atomic.LoadInt64(&throttler.isOpen) > 0 { + if throttler.IsOpen() { // already open log.Infof("Throttler: throttler is already open") return nil @@ -442,31 +458,34 @@ func (throttler *Throttler) Open() error { // opening of all other components. We thus read the throttler config in the background. // However, we want to handle a situation where the read errors out. // So we kick a loop that keeps retrying reading the config, for as long as this throttler is open. - go func() { - retryTicker := time.NewTicker(30 * time.Second) + retryReadAndApplyThrottlerConfig := func() { + retryInterval := 10 * time.Second + retryTicker := time.NewTicker(retryInterval) defer retryTicker.Stop() for { - if atomic.LoadInt64(&throttler.isOpen) == 0 { - // closed down. No need to keep retrying + if !throttler.IsOpen() { + // Throttler is not open so no need to keep retrying. + log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting") return } throttlerConfig, err := throttler.readThrottlerConfig(ctx) if err == nil { - // it's possible that during a retry-sleep, the throttler is closed and opened again, leading + log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig) + // It's possible that during a retry-sleep, the throttler is closed and opened again, leading // to two (or more) instances of this goroutine. That's not a big problem; it's fine if all // attempt to read the throttler config; but we just want to ensure they don't step on each other // while applying the changes. throttler.initMutex.Lock() defer throttler.initMutex.Unlock() - throttler.applyThrottlerConfig(ctx, throttlerConfig) // may issue an Enable return } - log.Errorf("Throttler.Open(): error reading throttler config. Will retry in 1 minute. Err=%+v", err) + log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): error reading throttler config. Will retry in %v. Err=%+v", retryInterval, err) <-retryTicker.C } - }() + } + go retryReadAndApplyThrottlerConfig() } else { // backwards-cmpatible: check for --enable-lag-throttler flag in vttablet // this will be removed in a future version @@ -483,7 +502,7 @@ func (throttler *Throttler) Close() { throttler.initMutex.Lock() log.Infof("Throttler: acquired initMutex lock") defer throttler.initMutex.Unlock() - if atomic.LoadInt64(&throttler.isOpen) == 0 { + if !throttler.IsOpen() { log.Infof("Throttler: throttler is not open") return } @@ -570,7 +589,6 @@ func (throttler *Throttler) isDormant() bool { // Operate is the main entry point for the throttler operation and logic. It will // run the probes, collect metrics, refresh inventory, etc. func (throttler *Throttler) Operate(ctx context.Context) { - tickers := [](*timer.SuspendableTicker){} addTicker := func(d time.Duration) *timer.SuspendableTicker { t := timer.NewSuspendableTicker(d, false) @@ -605,7 +623,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { // sparse shouldBeLeader := int64(0) - if atomic.LoadInt64(&throttler.isOpen) > 0 { + if throttler.IsOpen() { if throttler.tabletTypeFunc() == topodatapb.TabletType_PRIMARY { shouldBeLeader = 1 } @@ -631,7 +649,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case <-mysqlCollectTicker.C: { - if atomic.LoadInt64(&throttler.isOpen) > 0 { + if throttler.IsOpen() { // frequent if !throttler.isDormant() { throttler.collectMySQLMetrics(ctx) @@ -640,7 +658,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case <-mysqlDormantCollectTicker.C: { - if atomic.LoadInt64(&throttler.isOpen) > 0 { + if throttler.IsOpen() { // infrequent if throttler.isDormant() { throttler.collectMySQLMetrics(ctx) @@ -655,7 +673,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { case <-mysqlRefreshTicker.C: { // sparse - if atomic.LoadInt64(&throttler.isOpen) > 0 { + if throttler.IsOpen() { go throttler.refreshMySQLInventory(ctx) } } @@ -666,13 +684,13 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case <-mysqlAggregateTicker.C: { - if atomic.LoadInt64(&throttler.isOpen) > 0 { + if throttler.IsOpen() { throttler.aggregateMySQLMetrics(ctx) } } case <-throttledAppsTicker.C: { - if atomic.LoadInt64(&throttler.isOpen) > 0 { + if throttler.IsOpen() { go throttler.expireThrottledApps() } } @@ -1028,7 +1046,7 @@ func (throttler *Throttler) AppRequestMetricResult(ctx context.Context, appName // checkStore checks the aggregated value of given MySQL store func (throttler *Throttler) checkStore(ctx context.Context, appName string, storeName string, remoteAddr string, flags *CheckFlags) (checkResult *CheckResult) { - if !throttler.IsEnabled() { + if !throttler.IsRunning() { return okMetricCheckResult } if throttlerapp.ExemptFromChecks(appName) {