Skip to content

Commit

Permalink
VTOrc checks and fixes replication misconfiguration issues (#15881)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 authored May 8, 2024
1 parent 0df9ad0 commit cbf89bd
Show file tree
Hide file tree
Showing 54 changed files with 1,614 additions and 478 deletions.
2 changes: 1 addition & 1 deletion go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func startReplication(ctx context.Context, mysqld mysqlctl.MysqlDaemon, topoServ
}

// Stop replication (in case we're restarting), set replication source, and start replication.
if err := mysqld.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, ti.Tablet.MysqlPort, true /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
if err := mysqld.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, ti.Tablet.MysqlPort, 0, true, true); err != nil {
return vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed")
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ type vtcomboMysqld struct {
}

// SetReplicationSource implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error {
func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host string, port int32, heartbeatInterval float64, stopReplicationBefore bool, startReplicationAfter bool) error {
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,9 @@ func IsNum(typ uint8) bool {
typ == typeYear ||
typ == typeNewDecimal
}

const (
readReplicationConnectionConfiguration = "SELECT * FROM performance_schema.replication_connection_configuration"
readReplicaNetTimeout = "select @@global.replica_net_timeout"
readSlaveNetTimeout = "select @@global.slave_net_timeout"
)
28 changes: 25 additions & 3 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/replicationdata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)
Expand Down Expand Up @@ -120,7 +121,7 @@ type flavor interface {

// setReplicationSourceCommand returns the command to use the provided host/port
// as the new replication source (without changing any GTID position).
setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string
setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string

// status returns the result of the appropriate status command,
// with parsed replication position.
Expand All @@ -130,6 +131,11 @@ type flavor interface {
// with parsed executed position.
primaryStatus(c *Conn) (replication.PrimaryStatus, error)

// replicationConfiguration reads the right global variables and performance schema information.
replicationConfiguration(c *Conn) (*replicationdata.Configuration, error)

replicationNetTimeout(c *Conn) (int32, error)

// waitUntilPosition waits until the given position is reached or
// until the context expires. It returns an error if we did not
// succeed.
Expand Down Expand Up @@ -364,8 +370,8 @@ func (c *Conn) SetReplicationPositionCommands(pos replication.Position) []string
// as the new replication source (without changing any GTID position).
// It is guaranteed to be called with replication stopped.
// It should not start or stop replication.
func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
return c.flavor.setReplicationSourceCommand(params, host, port, connectRetry)
func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
return c.flavor.setReplicationSourceCommand(params, host, port, heartbeatInterval, connectRetry)
}

// resultToMap is a helper function used by ShowReplicationStatus.
Expand Down Expand Up @@ -400,6 +406,22 @@ func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) {
return c.flavor.primaryStatus(c)
}

// ReplicationConfiguration reads the right global variables and performance schema information.
func (c *Conn) ReplicationConfiguration() (*replicationdata.Configuration, error) {
replConfiguration, err := c.flavor.replicationConfiguration(c)
// We don't want to fail this call if it called on a primary tablet.
// There just isn't any replication configuration to return since it is a primary tablet.
if err == ErrNotReplica {
return nil, nil
}
if err != nil {
return nil, err
}
replNetTimeout, err := c.flavor.replicationNetTimeout(c)
replConfiguration.ReplicaNetTimeout = replNetTimeout
return replConfiguration, err
}

// WaitUntilPosition waits until the given position is reached or until the
// context expires. It returns an error if we did not succeed.
func (c *Conn) WaitUntilPosition(ctx context.Context, pos replication.Position) error {
Expand Down
11 changes: 10 additions & 1 deletion go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/replicationdata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)
Expand Down Expand Up @@ -228,7 +229,7 @@ func (flv *filePosFlavor) setReplicationPositionCommands(pos replication.Positio
}

// setReplicationSourceCommand is part of the Flavor interface.
func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
return "unsupported"
}

Expand Down Expand Up @@ -271,6 +272,14 @@ func (flv *filePosFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, err
return replication.ParseFilePosPrimaryStatus(resultMap)
}

func (flv *filePosFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configuration, error) {
return nil, nil
}

func (flv *filePosFlavor) replicationNetTimeout(c *Conn) (int32, error) {
return 0, nil
}

// waitUntilPosition is part of the Flavor interface.
func (flv *filePosFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
filePosPos, ok := pos.GTIDSet.(replication.FilePosGTID)
Expand Down
45 changes: 44 additions & 1 deletion go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"context"
"fmt"
"io"
"strconv"
"strings"
"time"

"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/replicationdata"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -187,7 +189,7 @@ func (mariadbFlavor) setReplicationPositionCommands(pos replication.Position) []
}
}

func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
args := []string{
fmt.Sprintf("MASTER_HOST = '%s'", host),
fmt.Sprintf("MASTER_PORT = %d", port),
Expand All @@ -210,6 +212,9 @@ func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string
if params.SslKey != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey))
}
if heartbeatInterval != 0 {
args = append(args, fmt.Sprintf("MASTER_HEARTBEAT_PERIOD = %v", heartbeatInterval))
}
args = append(args, "MASTER_USE_GTID = current_pos")
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}
Expand Down Expand Up @@ -255,6 +260,44 @@ func (m mariadbFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error)
return status, err
}

// replicationConfiguration is part of the Flavor interface.
func (mariadbFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configuration, error) {
qr, err := c.ExecuteFetch(readReplicationConnectionConfiguration, 100, true /* wantfields */)
if err != nil {
return nil, err
}
if len(qr.Rows) == 0 {
// The query returned no data. This is not a replica.
return nil, ErrNotReplica
}

resultMap, err := resultToMap(qr)
if err != nil {
return nil, err
}

heartbeatInterval, err := strconv.ParseFloat(resultMap["HEARTBEAT_INTERVAL"], 64)
if err != nil {
return nil, err
}

return &replicationdata.Configuration{
HeartbeatInterval: heartbeatInterval,
}, nil
}

// replicationNetTimeout is part of the Flavor interface.
func (mariadbFlavor) replicationNetTimeout(c *Conn) (int32, error) {
qr, err := c.ExecuteFetch(readSlaveNetTimeout, 1, false)
if err != nil {
return 0, err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for slave_net_timeout: %#v", qr)
}
return qr.Rows[0][0].ToInt32()
}

// waitUntilPosition is part of the Flavor interface.
//
// Note: Unlike MASTER_POS_WAIT(), MASTER_GTID_WAIT() will continue waiting even
Expand Down
17 changes: 15 additions & 2 deletions go/mysql/flavor_mariadb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,22 @@ func TestMariadbSetReplicationSourceCommand(t *testing.T) {
MASTER_USE_GTID = current_pos`

conn := &Conn{flavor: mariadbFlavor101{}}
got := conn.SetReplicationSourceCommand(params, host, port, connectRetry)
got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry)
assert.Equal(t, want, got, "mariadbFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want)

var heartbeatInterval float64 = 5.4
want = `CHANGE MASTER TO
MASTER_HOST = 'localhost',
MASTER_PORT = 123,
MASTER_USER = 'username',
MASTER_PASSWORD = 'password',
MASTER_CONNECT_RETRY = 1234,
MASTER_HEARTBEAT_PERIOD = 5.4,
MASTER_USE_GTID = current_pos`

got = conn.SetReplicationSourceCommand(params, host, port, heartbeatInterval, connectRetry)
assert.Equal(t, want, got, "mariadbFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, heartbeatInterval, connectRetry, got, want)

}

func TestMariadbSetReplicationSourceCommandSSL(t *testing.T) {
Expand Down Expand Up @@ -71,7 +84,7 @@ func TestMariadbSetReplicationSourceCommandSSL(t *testing.T) {
MASTER_USE_GTID = current_pos`

conn := &Conn{flavor: mariadbFlavor101{}}
got := conn.SetReplicationSourceCommand(params, host, port, connectRetry)
got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry)
assert.Equal(t, want, got, "mariadbFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want)

}
62 changes: 60 additions & 2 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"fmt"
"io"
"strconv"
"strings"
"time"

"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/replicationdata"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -300,6 +302,56 @@ func (mysqlFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) {
return replication.ParseMysqlPrimaryStatus(resultMap)
}

// replicationConfiguration is part of the Flavor interface.
func (mysqlFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configuration, error) {
qr, err := c.ExecuteFetch(readReplicationConnectionConfiguration, 100, true /* wantfields */)
if err != nil {
return nil, err
}
if len(qr.Rows) == 0 {
// The query returned no data. This is not a replica.
return nil, ErrNotReplica
}

resultMap, err := resultToMap(qr)
if err != nil {
return nil, err
}

heartbeatInterval, err := strconv.ParseFloat(resultMap["HEARTBEAT_INTERVAL"], 64)
if err != nil {
return nil, err
}

return &replicationdata.Configuration{
HeartbeatInterval: heartbeatInterval,
}, nil
}

// replicationNetTimeout is part of the Flavor interface.
func (mysqlFlavor) replicationNetTimeout(c *Conn) (int32, error) {
qr, err := c.ExecuteFetch(readSlaveNetTimeout, 1, false)
if err != nil {
return 0, err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for slave_net_timeout: %#v", qr)
}
return qr.Rows[0][0].ToInt32()
}

// replicationNetTimeout is part of the Flavor interface.
func (mysqlFlavor8) replicationNetTimeout(c *Conn) (int32, error) {
qr, err := c.ExecuteFetch(readReplicaNetTimeout, 1, false)
if err != nil {
return 0, err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for replica_net_timeout: %#v", qr)
}
return qr.Rows[0][0].ToInt32()
}

// status is part of the Flavor interface.
func (mysqlFlavor8) status(c *Conn) (replication.ReplicationStatus, error) {
qr, err := c.ExecuteFetch("SHOW REPLICA STATUS", 100, true /* wantfields */)
Expand Down Expand Up @@ -482,7 +534,7 @@ func (f mysqlFlavor8) supportsCapability(capability capabilities.FlavorCapabilit
return capabilities.MySQLVersionHasCapability(f.serverVersion, capability)
}

func (mysqlFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
func (mysqlFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
args := []string{
fmt.Sprintf("MASTER_HOST = '%s'", host),
fmt.Sprintf("MASTER_PORT = %d", port),
Expand All @@ -505,11 +557,14 @@ func (mysqlFlavor) setReplicationSourceCommand(params *ConnParams, host string,
if params.SslKey != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey))
}
if heartbeatInterval != 0 {
args = append(args, fmt.Sprintf("MASTER_HEARTBEAT_PERIOD = %v", heartbeatInterval))
}
args = append(args, "MASTER_AUTO_POSITION = 1")
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}

func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
args := []string{
fmt.Sprintf("SOURCE_HOST = '%s'", host),
fmt.Sprintf("SOURCE_PORT = %d", port),
Expand All @@ -532,6 +587,9 @@ func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string,
if params.SslKey != "" {
args = append(args, fmt.Sprintf("SOURCE_SSL_KEY = '%s'", params.SslKey))
}
if heartbeatInterval != 0 {
args = append(args, fmt.Sprintf("SOURCE_HEARTBEAT_PERIOD = %v", heartbeatInterval))
}
args = append(args, "SOURCE_AUTO_POSITION = 1")
return "CHANGE REPLICATION SOURCE TO\n " + strings.Join(args, ",\n ")
}
Expand Down
Loading

0 comments on commit cbf89bd

Please sign in to comment.