diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index a8c2de2e114..2c6fb2b867f 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -146,11 +146,10 @@ type flavor interface { // with parsed executed position. primaryStatus(c *Conn) (replication.PrimaryStatus, error) - // waitUntilPositionCommand returns the SQL command to issue - // to wait until the given position, until the context - // expires. The command returns -1 if it times out. It - // returns NULL if GTIDs are not enabled. - waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) + // waitUntilPosition waits until the given position is reached or + // until the context expires. It returns an error if we did not + // succeed. + waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error baseShowTables() string baseShowTablesWithSizes() string @@ -166,7 +165,8 @@ type CapableOf func(capability FlavorCapability) (bool, error) var flavors = make(map[string]func() flavor) // ServerVersionAtLeast returns true if current server is at least given value. -// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're on MySQL 8.0.23, 8.0.24, ... +// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're +// on MySQL 8.0.23, 8.0.24, ... func ServerVersionAtLeast(serverVersion string, parts ...int) (bool, error) { versionPrefix := strings.Split(serverVersion, "-")[0] versionTokens := strings.Split(versionPrefix, ".") @@ -448,21 +448,18 @@ func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) { return c.flavor.primaryStatus(c) } -// WaitUntilPositionCommand returns the SQL command to issue -// to wait until the given position, until the context -// expires. The command returns -1 if it times out. It -// returns NULL if GTIDs are not enabled. -func (c *Conn) WaitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) { - return c.flavor.waitUntilPositionCommand(ctx, pos) +// WaitUntilPosition waits until the given position is reached or until the +// context expires. It returns an error if we did not succeed. +func (c *Conn) WaitUntilPosition(ctx context.Context, pos replication.Position) error { + return c.flavor.waitUntilPosition(ctx, c, pos) } -// WaitUntilFilePositionCommand returns the SQL command to issue -// to wait until the given position, until the context -// expires for the file position flavor. The command returns -1 if it times out. It -// returns NULL if GTIDs are not enabled. -func (c *Conn) WaitUntilFilePositionCommand(ctx context.Context, pos replication.Position) (string, error) { +// WaitUntilFilePosition waits until the given position is reached or until +// the context expires for the file position flavor. It returns an error if +// we did not succeed. +func (c *Conn) WaitUntilFilePosition(ctx context.Context, pos replication.Position) error { filePosFlavor := filePosFlavor{} - return filePosFlavor.waitUntilPositionCommand(ctx, pos) + return filePosFlavor.waitUntilPosition(ctx, c, pos) } // BaseShowTables returns a query that shows tables diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index bf4076b85b1..96939faf3c4 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -266,22 +266,54 @@ func (flv *filePosFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, err return replication.ParseFilePosPrimaryStatus(resultMap) } -// waitUntilPositionCommand is part of the Flavor interface. -func (flv *filePosFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) { +// waitUntilPosition is part of the Flavor interface. +func (flv *filePosFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error { filePosPos, ok := pos.GTIDSet.(replication.FilePosGTID) if !ok { - return "", fmt.Errorf("Position is not filePos compatible: %#v", pos.GTIDSet) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "position is not filePos compatible: %#v", pos.GTIDSet) } + query := fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos) if deadline, ok := ctx.Deadline(); ok { timeout := time.Until(deadline) if timeout <= 0 { - return "", fmt.Errorf("timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) } - return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds()), nil + query = fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds()) } - return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos), nil + result, err := c.ExecuteFetch(query, 1, false) + if err != nil { + return err + } + + // For MASTER_POS_WAIT(), the return value is the number of log events + // the replica had to wait for to advance to the specified position. + // The function returns NULL if the replica SQL thread is not started, + // the replica's source information is not initialized, the arguments + // are incorrect, or an error occurs. It returns -1 if the timeout has + // been exceeded. If the replica SQL thread stops while MASTER_POS_WAIT() + // is waiting, the function returns NULL. If the replica is past the + // specified position, the function returns immediately. + if len(result.Rows) != 1 || len(result.Rows[0]) != 1 { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result) + } + val := result.Rows[0][0] + if val.IsNull() { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "replication is not running") + } + state, err := val.ToInt64() + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val) + } + switch { + case state == -1: + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) + case state >= 0: + return nil + default: + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state) + } } func (*filePosFlavor) startReplicationUntilAfter(pos replication.Position) string { diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 15718542b45..77b1b4f1399 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -25,8 +25,9 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // mariadbFlavor implements the Flavor interface for MariaDB. @@ -48,7 +49,7 @@ func (mariadbFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) { return nil, err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_binlog_pos: %#v", qr) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_binlog_pos: %#v", qr) } return replication.ParseMariadbGTIDSet(qr.Rows[0][0].ToString()) @@ -223,22 +224,45 @@ func (m mariadbFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) return status, err } -// waitUntilPositionCommand is part of the Flavor interface. +// waitUntilPosition is part of the Flavor interface. // // Note: Unlike MASTER_POS_WAIT(), MASTER_GTID_WAIT() will continue waiting even // if the sql thread stops. If that is a problem, we'll have to change this. -func (mariadbFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) { +func (mariadbFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error { + // Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means + // return immediately. + query := fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos) if deadline, ok := ctx.Deadline(); ok { timeout := time.Until(deadline) if timeout <= 0 { - return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) } - return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds()), nil + query = fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds()) } - // Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means - // return immediately. - return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos), nil + result, err := c.ExecuteFetch(query, 1, false) + if err != nil { + return err + } + + // For MASTER_GTID_WAIT(), if the wait completes without a timeout 0 is + // returned and -1 if there was a timeout. + if len(result.Rows) != 1 || len(result.Rows[0]) != 1 { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result) + } + val := result.Rows[0][0] + state, err := val.ToInt64() + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val) + } + switch state { + case 0: + return nil + case -1: + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) + default: + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state) + } } // readBinlogEvent is part of the Flavor interface. diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index be11126ff9c..a3a449f5490 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -24,8 +24,9 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // mysqlFlavor implements the Flavor interface for Mysql. @@ -52,7 +53,7 @@ func (mysqlFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) { return nil, err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_executed: %#v", qr) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_executed: %#v", qr) } return replication.ParseMysql56GTIDSet(qr.Rows[0][0].ToString()) } @@ -65,7 +66,7 @@ func (mysqlFlavor) purgedGTIDSet(c *Conn) (replication.GTIDSet, error) { return nil, err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_purged: %#v", qr) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_purged: %#v", qr) } return replication.ParseMysql56GTIDSet(qr.Rows[0][0].ToString()) } @@ -78,7 +79,7 @@ func (mysqlFlavor) serverUUID(c *Conn) (string, error) { return "", err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return "", vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for server_uuid: %#v", qr) + return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for server_uuid: %#v", qr) } return qr.Rows[0][0].ToString(), nil } @@ -90,7 +91,7 @@ func (mysqlFlavor) gtidMode(c *Conn) (string, error) { return "", err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return "", vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_mode: %#v", qr) + return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_mode: %#v", qr) } return qr.Rows[0][0].ToString(), nil } @@ -135,7 +136,7 @@ func (mysqlFlavor) startSQLThreadCommand() string { func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error { gtidSet, ok := startPos.GTIDSet.(replication.Mysql56GTIDSet) if !ok { - return vterrors.Errorf(vtrpc.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet) } // Build the command. @@ -216,14 +217,14 @@ func (mysqlFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) { return replication.ParseMysqlPrimaryStatus(resultMap) } -// waitUntilPositionCommand is part of the Flavor interface. -func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) { +// waitUntilPosition is part of the Flavor interface. +func (mysqlFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error { // A timeout of 0 means wait indefinitely. timeoutSeconds := 0 if deadline, ok := ctx.Deadline(); ok { timeout := time.Until(deadline) if timeout <= 0 { - return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) } // Only whole numbers of seconds are supported. @@ -234,7 +235,30 @@ func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication } } - return fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds), nil + query := fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds) + result, err := c.ExecuteFetch(query, 1, false) + if err != nil { + return err + } + + // For WAIT_FOR_EXECUTED_GTID_SET(), the return value is the state of the query, where + // 0 represents success, and 1 represents timeout. Any other failures generate an error. + if len(result.Rows) != 1 || len(result.Rows[0]) != 1 { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result) + } + val := result.Rows[0][0] + state, err := val.ToInt64() + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val) + } + switch state { + case 0: + return nil + case 1: + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) + default: + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state) + } } // readBinlogEvent is part of the Flavor interface. diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 23b19669f16..1dd03d901cb 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/hook" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vterrors" ) type ResetSuperReadOnlyFunc func() error @@ -315,7 +316,8 @@ func (mysqld *Mysqld) SetSuperReadOnly(on bool) (ResetSuperReadOnlyFunc, error) return resetFunc, nil } -// WaitSourcePos lets replicas wait to given replication position +// WaitSourcePos lets replicas wait for the given replication position to +// be reached. func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.Position) error { // Get a connection. conn, err := getPoolReconnect(ctx, mysqld.dbaPool) @@ -324,61 +326,33 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P } defer conn.Recycle() - // First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection, - // unless that flavor is also filePos. - waitCommandName := "WaitUntilPositionCommand" - var query string + // First check if filePos flavored Position was passed in. If so, we + // can't defer to the flavor in the connection, unless that flavor is + // also filePos. if targetPos.MatchesFlavor(replication.FilePosFlavorID) { - // If we are the primary, WaitUntilFilePositionCommand will fail. - // But position is most likely reached. So, check the position - // first. + // If we are the primary, WaitUntilFilePosition will fail. But + // position is most likely reached. So, check the position first. mpos, err := conn.Conn.PrimaryFilePosition() if err != nil { - return fmt.Errorf("WaitSourcePos: PrimaryFilePosition failed: %v", err) + return vterrors.Wrapf(err, "WaitSourcePos: PrimaryFilePosition failed") } if mpos.AtLeast(targetPos) { return nil } - - // Find the query to run, run it. - query, err = conn.Conn.WaitUntilFilePositionCommand(ctx, targetPos) - if err != nil { - return err - } - waitCommandName = "WaitUntilFilePositionCommand" } else { - // If we are the primary, WaitUntilPositionCommand will fail. - // But position is most likely reached. So, check the position - // first. + // If we are the primary, WaitUntilPosition will fail. But + // position is most likely reached. So, check the position first. mpos, err := conn.Conn.PrimaryPosition() if err != nil { - return fmt.Errorf("WaitSourcePos: PrimaryPosition failed: %v", err) + return vterrors.Wrapf(err, "WaitSourcePos: PrimaryPosition failed") } if mpos.AtLeast(targetPos) { return nil } - - // Find the query to run, run it. - query, err = conn.Conn.WaitUntilPositionCommand(ctx, targetPos) - if err != nil { - return err - } } - qr, err := mysqld.FetchSuperQuery(ctx, query) - if err != nil { - return fmt.Errorf("%v(%v) failed: %v", waitCommandName, query, err) - } - - if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return fmt.Errorf("unexpected result format from %v(%v): %#v", waitCommandName, query, qr) - } - result := qr.Rows[0][0] - if result.IsNull() { - return fmt.Errorf("%v(%v) failed: replication is probably stopped", waitCommandName, query) - } - if result.ToString() == "-1" { - return fmt.Errorf("timed out waiting for position %v", targetPos) + if err := conn.Conn.WaitUntilPosition(ctx, targetPos); err != nil { + return vterrors.Wrapf(err, "WaitSourcePos failed") } return nil }