Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect ExecuteFetchAsDBA against multi-statements, excluding a sequence of CREATE TABLE|VIEW. #14954

Merged
8 changes: 4 additions & 4 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,12 @@ func tearDown(t *testing.T, initMysql bool) {
}
caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})
require.True(t, caughtUp, "Timed out waiting for all replicas to catch up")
promoteCommands := "STOP SLAVE; RESET SLAVE ALL; RESET MASTER;"
disableSemiSyncCommands := "SET GLOBAL rpl_semi_sync_master_enabled = false; SET GLOBAL rpl_semi_sync_slave_enabled = false"
promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"}
disableSemiSyncCommands := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"}
for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} {
_, err := tablet.VttabletProcess.QueryTablet(promoteCommands, keyspaceName, true)
err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true)
require.Nil(t, err)
_, err = tablet.VttabletProcess.QueryTablet(disableSemiSyncCommands, keyspaceName, true)
err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommands, keyspaceName, true)
require.Nil(t, err)
}

Expand Down
48 changes: 45 additions & 3 deletions go/test/endtoend/clustertest/vtctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,51 @@ func testTabletStatus(t *testing.T) {
}

func testExecuteAsDba(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
require.NoError(t, err)
assert.Equal(t, result, oneTableOutput)
tcases := []struct {
query string
result string
expectErr bool
}{
{
query: "",
expectErr: true,
},
{
query: "SELECT 1 AS a",
result: oneTableOutput,
},
{
query: "SELECT 1 AS a; SELECT 1 AS a",
expectErr: true,
},
{
query: "create table t(id int)",
result: "",
},
{
query: "create table if not exists t(id int)",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int);",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int); SELECT 1 AS a",
expectErr: true,
},
}
for _, tcase := range tcases {
t.Run(tcase.query, func(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query)
if tcase.expectErr {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tcase.result, result)
}
})
}
}

func testExecuteAsApp(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/mysqlserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
PARTITION BY HASH( TO_DAYS(created) )
PARTITIONS 10;
`
createProcSQL = `use vt_test_keyspace;
createProcSQL = `
CREATE PROCEDURE testing()
BEGIN
delete from vt_insert_test;
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestMain(m *testing.M) {
}

primaryTabletProcess := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet().VttabletProcess
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, false); err != nil {
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, true); err != nil {
return 1, err
}

Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ func TestNoReplicationStatusAndIOThreadStopped(t *testing.T) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE; RESET SLAVE ALL`)
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE`)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `RESET SLAVE ALL`)
require.NoError(t, err)
//
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[3].Alias, `STOP SLAVE IO_THREAD;`)
require.NoError(t, err)
// Run an additional command in the current primary which will only be acked by tablets[2] and be in its relay log.
Expand Down
29 changes: 18 additions & 11 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestPRSWithDrainedLaggingTablet(t *testing.T) {
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// make tablets[1 lag from the other tablets by setting the delay to a large number
utils.RunSQL(context.Background(), t, `stop slave;CHANGE MASTER TO MASTER_DELAY = 1999;start slave;`, tablets[1])
utils.RunSQLs(context.Background(), t, []string{`stop slave`, `CHANGE MASTER TO MASTER_DELAY = 1999`, `start slave;`}, tablets[1])

// insert another row in tablets[1
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[2], tablets[3]})
Expand Down Expand Up @@ -226,26 +226,33 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus
}

// commands to convert a replica to be writable
promoteReplicaCommands := "STOP SLAVE; RESET SLAVE ALL; SET GLOBAL read_only = OFF;"
utils.RunSQL(ctx, t, promoteReplicaCommands, tablets[1])
promoteReplicaCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "SET GLOBAL read_only = OFF"}
utils.RunSQLs(ctx, t, promoteReplicaCommands, tablets[1])

// Get primary position
_, gtID := cluster.GetPrimaryPosition(t, *tablets[1], utils.Hostname)

// tablets[0] will now be a replica of tablets[1
changeReplicationSourceCommands := fmt.Sprintf("RESET MASTER; RESET SLAVE; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[0])
changeReplicationSourceCommands := []string{
"RESET MASTER",
"RESET SLAVE",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[0])

// Capture time when we made tablets[1 writable
baseTime := time.Now().UnixNano() / 1000000000

// tablets[2 will be a replica of tablets[1
changeReplicationSourceCommands = fmt.Sprintf("STOP SLAVE; RESET MASTER; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[2])
changeReplicationSourceCommands = []string{
"STOP SLAVE",
"RESET MASTER",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
"START SLAVE",
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[2])

// To test the downPrimary, we kill the old primary first and delete its tablet record
if downPrimary {
Expand Down
9 changes: 9 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ func getMysqlConnParam(tablet *cluster.Vttablet) mysql.ConnParams {
return connParams
}

// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet
func RunSQLs(ctx context.Context, t *testing.T, sqls []string, tablet *cluster.Vttablet) (results []*sqltypes.Result) {
for _, sql := range sqls {
result := RunSQL(ctx, t, sql, tablet)
results = append(results, result)
}
return results
}

// RunSQL is used to run a SQL command directly on the MySQL instance of a vttablet
func RunSQL(ctx context.Context, t *testing.T, sql string, tablet *cluster.Vttablet) *sqltypes.Result {
tabletParams := getMysqlConnParam(tablet)
Expand Down
21 changes: 14 additions & 7 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("ReplicationFromOtherReplica", func(t *testing.T) {
// point replica at otherReplica
changeReplicationSourceCommand := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1; START SLAVE", utils.Hostname, otherReplica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommand, replica, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, otherReplica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, replica, "")
require.NoError(t, err)

// wait until the source port is set back correctly by vtorc
Expand All @@ -204,10 +208,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("CircularReplication", func(t *testing.T) {
// change the replication source on the primary
changeReplicationSourceCommands := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", replica.VttabletProcess.TabletHostname, replica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommands, curPrimary, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", replica.VttabletProcess.TabletHostname, replica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, curPrimary, "")
require.NoError(t, err)

// wait for curPrimary to reach stable state
Expand Down
75 changes: 75 additions & 0 deletions go/vt/sqlparser/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,77 @@ func TestColumns_FindColumn(t *testing.T) {
}
}

func TestSplitStatements(t *testing.T) {
testcases := []struct {
input string
stmts int
wantErr bool
}{
{
input: "select * from table1; \t; \n; \n\t\t ;select * from table1;",
stmts: 2,
}, {
input: "select * from table1",
stmts: 1,
}, {
input: "select * from table1;",
stmts: 1,
}, {
input: "select * from table1; ",
stmts: 1,
}, {
input: "select * from table1; select * from table2;",
stmts: 2,
}, {
input: "create /*vt+ directive=true */ table t1 (id int); create table t2 (id int); create table t3 (id int)",
stmts: 3,
}, {
input: "create /*vt+ directive=true */ table t1 (id int); create table t2 (id int); create table t3 (id int);",
stmts: 3,
}, {
input: "select * from /* comment ; */ table1;",
stmts: 1,
}, {
input: "select * from table1 where semi = ';';",
stmts: 1,
}, {
input: "CREATE TABLE `total_data` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id', " +
"`region` varchar(32) NOT NULL COMMENT 'region name, like zh; th; kepler'," +
"`data_size` bigint NOT NULL DEFAULT '0' COMMENT 'data size;'," +
"`createtime` datetime NOT NULL DEFAULT NOW() COMMENT 'create time;'," +
"`comment` varchar(100) NOT NULL DEFAULT '' COMMENT 'comment'," +
"PRIMARY KEY (`id`))",
stmts: 1,
}, {
input: "create table t1 (id int primary key); create table t2 (id int primary key);",
stmts: 2,
}, {
input: ";;; create table t1 (id int primary key);;; ;create table t2 (id int primary key);",
stmts: 2,
}, {
input: ";create table t1 ;create table t2 (id;",
wantErr: true,
}, {
// Ignore quoted semicolon
input: ";create table t1 ';';;;create table t2 (id;",
wantErr: true,
},
}

parser := NewTestParser()
for _, tcase := range testcases {
t.Run(tcase.input, func(t *testing.T) {
statements, err := parser.SplitStatements(tcase.input)
if tcase.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tcase.stmts, len(statements))
}
})
}
}

func TestSplitStatementToPieces(t *testing.T) {
testcases := []struct {
input string
Expand Down Expand Up @@ -745,6 +816,10 @@ func TestSplitStatementToPieces(t *testing.T) {
// Ignore quoted semicolon
input: ";create table t1 ';';;;create table t2 (id;",
output: "create table t1 ';';create table t2 (id",
}, {
// Ignore quoted semicolon
input: "stop replica; start replica",
output: "stop replica; start replica",
},
}

Expand Down
17 changes: 17 additions & 0 deletions go/vt/sqlparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sqlparser

import (
"errors"
"fmt"
"io"
"strconv"
Expand Down Expand Up @@ -229,6 +230,22 @@ func (p *Parser) SplitStatement(blob string) (string, string, error) {
return blob, "", nil
}

// SplitStatements splits a given blob into multiple SQL statements.
func (p *Parser) SplitStatements(blob string) (statements []Statement, err error) {
tokenizer := p.NewStringTokenizer(blob)
for {
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
stmt, err := ParseNext(tokenizer)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, err
}
statements = append(statements, stmt)
}
return statements, nil
}

// SplitStatementToPieces split raw sql statement that may have multi sql pieces to sql pieces
// returns the sql pieces blob contains; or error if sql cannot be parsed
func (p *Parser) SplitStatementToPieces(blob string) (pieces []string, err error) {
Expand Down
20 changes: 20 additions & 0 deletions go/vt/sqlparser/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import (
"fmt"
"sort"
"strings"

querypb "vitess.io/vitess/go/vt/proto/query"
)
Expand Down Expand Up @@ -160,3 +161,22 @@
}
return query, nil
}

// ReplaceTableQualifiersMultiQuery accepts a multi-query string and modifies it
// via ReplaceTableQualifiers, one query at a time.
func (p *Parser) ReplaceTableQualifiersMultiQuery(multiQuery, olddb, newdb string) (string, error) {
queries, err := p.SplitStatementToPieces(multiQuery)
if err != nil {
return multiQuery, err
}

Check warning on line 171 in go/vt/sqlparser/utils.go

View check run for this annotation

Codecov / codecov/patch

go/vt/sqlparser/utils.go#L170-L171

Added lines #L170 - L171 were not covered by tests
var modifiedQueries []string
for _, query := range queries {
// Replace any provided sidecar database qualifiers with the correct one.
query, err := p.ReplaceTableQualifiers(query, olddb, newdb)
if err != nil {
return query, err
}
modifiedQueries = append(modifiedQueries, query)
}
return strings.Join(modifiedQueries, ";"), nil
}
Loading
Loading