Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reroute 'ALTER VITESS_MIGRATION ... THROTTLE ...' through topo #13511

Merged
merged 4 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
Expand Down Expand Up @@ -539,6 +540,8 @@ func testScheduler(t *testing.T) {
t.Run("ALTER both tables, elligible for concurrenct, with throttling", func(t *testing.T) {
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a "wait-for", because onlineddl.ThrottleAllMigrations is now an async flow.


// ALTER TABLE is allowed to run concurrently when no other ALTER is busy with copy state. Our tables are tiny so we expect to find both migrations running
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true)) // skip wait
Expand All @@ -555,6 +558,8 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
})
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a "wait-for", because onlineddl.ThrottleAllMigrations is now an async flow.


t.Run("check ready to complete (before)", func(t *testing.T) {
for _, uuid := range []string{t1uuid, t2uuid} {
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
Expand Down
65 changes: 30 additions & 35 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ import (

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
ThrottledAppsTimeout = 60 * time.Second
)

// VtgateExecQuery runs a query on VTGate using given query params
func VtgateExecQuery(t *testing.T, vtParams *mysql.ConnParams, query string, expectError string) *sqltypes.Result {
t.Helper()
Expand Down Expand Up @@ -313,16 +316,35 @@ func UnthrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) {

// CheckThrottledApps checks for existence or non-existence of an app in the throttled apps list
func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, throttlerApp throttlerapp.Name, expectFind bool) {
query := "show vitess_throttled_apps"
r := VtgateExecQuery(t, vtParams, query, "")

found := false
for _, row := range r.Named().Rows {
if throttlerApp.Equals(row.AsString("app", "")) {
found = true
ctx, cancel := context.WithTimeout(context.Background(), ThrottledAppsTimeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
query := "show vitess_throttled_apps"
r := VtgateExecQuery(t, vtParams, query, "")

appFound := false
for _, row := range r.Named().Rows {
if throttlerApp.Equals(row.AsString("app", "")) {
appFound = true
}
}
if appFound == expectFind {
// we're all good
return
}

select {
case <-ctx.Done():
assert.Failf(t, "CheckThrottledApps timed out waiting for %v to be in throttled status '%v'", throttlerApp.String(), expectFind)
return
case <-ticker.C:
}
}
assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", throttlerApp, found)
}

// WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp
Expand Down Expand Up @@ -350,33 +372,6 @@ func WaitForThrottledTimestamp(t *testing.T, vtParams *mysql.ConnParams, uuid st
return
}

// WaitForThrottlerStatusEnabled waits for a tablet to report its throttler status as enabled.
func WaitForThrottlerStatusEnabled(t *testing.T, tablet *cluster.Vttablet, timeout time.Duration) {
jsonPath := "IsEnabled"
url := fmt.Sprintf("http://localhost:%d/throttler/status", tablet.HTTPPort)

ctx, cancel := context.WithTimeout(context.Background(), throttlerConfigTimeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
body := getHTTPBody(url)
val, err := jsonparser.GetBoolean([]byte(body), jsonPath)
require.NoError(t, err)
if val {
return
}
select {
case <-ctx.Done():
t.Error("timeout waiting for tablet's throttler status to be enabled")
return
case <-ticker.C:
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here is just code I found was unused.

func getHTTPBody(url string) string {
resp, err := http.Get(url)
if err != nil {
Expand Down
26 changes: 12 additions & 14 deletions go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,23 @@ func UnthrottleApp(clusterInstance *cluster.LocalProcessCluster, throttlerApp th
return throttleApp(clusterInstance, throttlerApp, false)
}

// ThrottleAppAndWaitUntilTabletsConfirm
func ThrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name) (string, error) {
res, err := throttleApp(clusterInstance, throttlerApp, true)
if err != nil {
return res, err
}
func WaitUntilTabletsConfirmThrottledApp(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name, expectThrottled bool) {
for _, ks := range clusterInstance.Keyspaces {
for _, shard := range ks.Shards {
for _, tablet := range shard.Vttablets {
WaitForThrottledApp(t, tablet, throttlerApp, true, ConfigTimeout)
WaitForThrottledApp(t, tablet, throttlerApp, expectThrottled, ConfigTimeout)
}
}
}
}

// ThrottleAppAndWaitUntilTabletsConfirm
func ThrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *cluster.LocalProcessCluster, throttlerApp throttlerapp.Name) (string, error) {
res, err := throttleApp(clusterInstance, throttlerApp, true)
if err != nil {
return res, err
}
WaitUntilTabletsConfirmThrottledApp(t, clusterInstance, throttlerApp, true)
return res, nil
}

Expand All @@ -232,13 +236,7 @@ func UnthrottleAppAndWaitUntilTabletsConfirm(t *testing.T, clusterInstance *clus
if err != nil {
return res, err
}
for _, ks := range clusterInstance.Keyspaces {
for _, shard := range ks.Shards {
for _, tablet := range shard.Vttablets {
WaitForThrottledApp(t, tablet, throttlerApp, false, ConfigTimeout)
}
}
}
WaitUntilTabletsConfirmThrottledApp(t, clusterInstance, throttlerApp, false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file are just a simple refactor into a function. Not really related to the overall changes in the PR.

return res, nil
}

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (t *noopVCursor) SetExec(ctx context.Context, name string, value string) er
panic("implement me")
}

func (t *noopVCursor) ThrottleApp(ctx context.Context, throttleAppRule *topodatapb.ThrottledAppRule) error {
panic("implement me")
}

func (t *noopVCursor) ShowExec(ctx context.Context, command sqlparser.ShowCommandType, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
panic("implement me")
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

Expand Down Expand Up @@ -111,6 +112,8 @@ type (
ShowExec(ctx context.Context, command sqlparser.ShowCommandType, filter *sqlparser.ShowFilter) (*sqltypes.Result, error)
// SetExec takes in k,v pair and use executor to set them in topo metadata.
SetExec(ctx context.Context, name string, value string) error
// ThrottleApp sets a ThrottlerappRule in topo
ThrottleApp(ctx context.Context, throttleAppRule *topodatapb.ThrottledAppRule) error

// CanUseSetVar returns true if system_settings can use SET_VAR hint.
CanUseSetVar() bool
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Stat
case sqlparser.DDLStatement:
return buildGeneralDDLPlan(ctx, query, stmt, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
case *sqlparser.AlterMigration:
return buildAlterMigrationPlan(query, vschema, enableOnlineDDL)
return buildAlterMigrationPlan(query, stmt, vschema, enableOnlineDDL)
case *sqlparser.RevertMigration:
return buildRevertMigrationPlan(query, stmt, vschema, enableOnlineDDL)
case *sqlparser.ShowMigrationLogs:
Expand Down
65 changes: 64 additions & 1 deletion go/vt/vtgate/planbuilder/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,73 @@ limitations under the License.
package planbuilder

import (
"strconv"
"time"

"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

func buildAlterMigrationPlan(query string, vschema plancontext.VSchema, enableOnlineDDL bool) (*planResult, error) {
func validateThrottleParams(alterMigrationType sqlparser.AlterMigrationType, expireString string, ratioLiteral *sqlparser.Literal) (duration time.Duration, ratio float64, err error) {
switch alterMigrationType {
case sqlparser.UnthrottleMigrationType,
sqlparser.UnthrottleAllMigrationType:
// Unthrottling is like throttling with duration=0
duration = 0
default:
duration = time.Hour * 24 * 365 * 100
if expireString != "" {
duration, err = time.ParseDuration(expireString)
if err != nil || duration < 0 {
return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid EXPIRE value: %s. Try '120s', '30m', '1h', etc. Allowed units are (s)ec, (m)in, (h)hour", expireString)
}
}
}
ratio = 1.0
if ratioLiteral != nil {
ratio, err = strconv.ParseFloat(ratioLiteral.Val, 64)
if err != nil || ratio < 0 || ratio > 1 {
return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid RATIO value: %s. Try any decimal number between '0.0' (no throttle) and `1.0` (fully throttled)", ratioLiteral.Val)
}
}
return duration, ratio, nil
}

func buildAlterMigrationThrottleAppPlan(query string, alterMigration *sqlparser.AlterMigration, keyspace *vindexes.Keyspace) (*planResult, error) {
duration, ratio, err := validateThrottleParams(alterMigration.Type, alterMigration.Expire, alterMigration.Ratio)
if err != nil {
return nil, err
}
expireAt := time.Now().Add(duration)
appName := alterMigration.UUID
if appName == "" {
appName = throttlerapp.OnlineDDLName.String()
}
throttledAppRule := &topodatapb.ThrottledAppRule{
Name: appName,
ExpiresAt: logutil.TimeToProto(expireAt),
Ratio: ratio,
}
return newPlanResult(&engine.ThrottleApp{
Keyspace: keyspace,
ThrottledAppRule: throttledAppRule,
}), nil
}

func buildAlterMigrationPlan(query string, alterMigration *sqlparser.AlterMigration, vschema plancontext.VSchema, enableOnlineDDL bool) (*planResult, error) {
if !enableOnlineDDL {
return nil, schema.ErrOnlineDDLDisabled
}

dest, ks, tabletType, err := vschema.TargetDestination("")
if err != nil {
return nil, err
Expand All @@ -38,6 +92,15 @@ func buildAlterMigrationPlan(query string, vschema plancontext.VSchema, enableOn
return nil, vterrors.VT09005()
}

switch alterMigration.Type {
case sqlparser.ThrottleMigrationType,
sqlparser.ThrottleAllMigrationType,
sqlparser.UnthrottleMigrationType,
sqlparser.UnthrottleAllMigrationType:
// ALTER VITESS_MIGRATION ... THROTTLE ... queries go to topo (similarly to `vtctldclient UpdateThrottlerConfig`)
return buildAlterMigrationThrottleAppPlan(query, alterMigration, ks)
}

if tabletType != topodatapb.TabletType_PRIMARY {
return nil, vterrors.VT09006("ALTER")
}
Expand Down
50 changes: 50 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -1137,6 +1138,55 @@ func (vc *vcursorImpl) SetExec(ctx context.Context, name string, value string) e
return vc.executor.setVitessMetadata(ctx, name, value)
}

func (vc *vcursorImpl) ThrottleApp(ctx context.Context, throttledAppRule *topodatapb.ThrottledAppRule) (err error) {
if throttledAppRule == nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ThrottleApp: nil rule")
}
if throttledAppRule.Name == "" {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ThrottleApp: app name is empty")
}
// We don't strictly have to construct a UpdateThrottlerConfigRequest here, because we only populate it
// with a couple variables; we could do without it. However, constructing the request makes the remaining code
// consistent with vtctldclient/command/throttler.go and we prefer this consistency
req := &vtctldatapb.UpdateThrottlerConfigRequest{
Keyspace: vc.keyspace,
ThrottledApp: throttledAppRule,
}

update := func(throttlerConfig *topodatapb.ThrottlerConfig) *topodatapb.ThrottlerConfig {
if throttlerConfig == nil {
throttlerConfig = &topodatapb.ThrottlerConfig{}
}
if throttlerConfig.ThrottledApps == nil {
throttlerConfig.ThrottledApps = make(map[string]*topodatapb.ThrottledAppRule)
}
throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp
return throttlerConfig
}

ctx, unlock, lockErr := vc.topoServer.LockKeyspace(ctx, req.Keyspace, "UpdateThrottlerConfig")
if lockErr != nil {
return lockErr
}
defer unlock(&err)

ki, err := vc.topoServer.GetKeyspace(ctx, req.Keyspace)
if err != nil {
return err
}

ki.ThrottlerConfig = update(ki.ThrottlerConfig)

err = vc.topoServer.UpdateKeyspace(ctx, ki)
if err != nil {
return err
}

_, err = vc.topoServer.UpdateSrvKeyspaceThrottlerConfig(ctx, req.Keyspace, []string{}, update)

return err
}

func (vc *vcursorImpl) CanUseSetVar() bool {
return sqlparser.IsMySQL80AndAbove() && setVarEnabled
}
Expand Down