Skip to content

Commit

Permalink
Fixing multi source failover for MySQL #459
Browse files Browse the repository at this point in the history
Fixing super read only for MySQL #459
  • Loading branch information
svaroqui committed Dec 29, 2022
1 parent 209a8f9 commit 695a5a4
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 55 deletions.
10 changes: 5 additions & 5 deletions cluster/cluster_fail.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,12 @@ func (cluster *Cluster) MasterFailover(fail bool) bool {
}

if cluster.Conf.ReadOnly {
logs, err = dbhelper.SetReadOnly(cluster.oldMaster.Conn, true)
logs, err = cluster.oldMaster.SetReadOnly()
cluster.LogSQL(logs, err, cluster.oldMaster.URL, "MasterFailover", LvlErr, "Could not set old master as read-only, %s", err)

} else {
logs, err = dbhelper.SetReadOnly(cluster.oldMaster.Conn, false)
cluster.LogSQL(logs, err, cluster.oldMaster.URL, "MasterFailover", LvlErr, "Could not set old master as read-write, %s", err)
/* } else {
logs, err = cluster.oldMaster.SetReadWrite()
cluster.LogSQL(logs, err, cluster.oldMaster.URL, "MasterFailover", LvlErr, "Could not set old master as read-write, %s", err)
*/
}
if cluster.Conf.SwitchDecreaseMaxConn {

Expand Down
94 changes: 52 additions & 42 deletions cluster/prx_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,48 +166,50 @@ func (proxy *Proxy) GetEnv() map[string]string {

func (proxy *Proxy) GetBaseEnv() map[string]string {
return map[string]string{
"%%ENV:NODES_CPU_CORES%%": proxy.ClusterGroup.Conf.ProvCores,
"%%ENV:SVC_CONF_ENV_MAX_CORES%%": proxy.ClusterGroup.Conf.ProvCores,
"%%ENV:SVC_CONF_ENV_CRC32_ID%%": string(proxy.Id[2:10]),
"%%ENV:SVC_CONF_ENV_SERVER_ID%%": string(proxy.Id[2:10]),
"%%ENV:SVC_CONF_ENV_MYSQL_ROOT_PASSWORD%%": proxy.ClusterGroup.dbPass,
"%%ENV:SVC_CONF_ENV_MYSQL_ROOT_USER%%": proxy.ClusterGroup.dbUser,
"%%ENV:SERVER_IP%%": proxy.GetBindAddress(),
"%%ENV:EXTRA_BIND_SERVER_IPV6%%": proxy.GetBindAddressExtraIPV6(),
"%%ENV:SVC_CONF_ENV_PROXY_USE_SSL%%": proxy.GetUseSSL(),
"%%ENV:CAUSAL_READ%%": proxy.GetCausalRead(),
"%%ENV:SVC_CONF_ENV_PROXY_USE_COMPRESS%%": proxy.GetUseCompression(),
"%%ENV:SERVER_PORT%%": proxy.Port,
"%%ENV:SVC_NAMESPACE%%": proxy.ClusterGroup.Name,
"%%ENV:SVC_NAME%%": proxy.Name,
"%%ENV:SERVERS_HAPROXY_WRITE%%": proxy.GetConfigProxyModule("%%ENV:SERVERS_HAPROXY_WRITE%%"),
"%%ENV:SERVERS_HAPROXY_READ%%": proxy.GetConfigProxyModule("%%ENV:SERVERS_HAPROXY_READ%%"),
"%%ENV:SERVERS_HAPROXY_WRITE_BACKEND%%": proxy.ClusterGroup.Conf.HaproxyAPIWriteBackend,
"%%ENV:SERVERS_HAPROXY_READ_BACKEND%%": proxy.ClusterGroup.Conf.HaproxyAPIReadBackend,
"%%ENV:SVC_CONF_HAPROXY_DNS%%": proxy.GetConfigProxyDNS(),
"%%ENV:SERVERS_PROXYSQL%%": proxy.GetConfigProxyModule("%%ENV:SERVERS_PROXYSQL%%"),
"%%ENV:SERVERS%%": proxy.GetConfigProxyModule("%%ENV:SERVERS%%"),
"%%ENV:SERVERS_LIST%%": proxy.GetConfigProxyModule("%%ENV:SERVERS_LIST%%"),
"%%ENV:SVC_CONF_ENV_PORT_HTTP%%": "80",
"%%ENV:SVC_CONF_ENV_PORT_R_LB%%": strconv.Itoa(proxy.ReadPort),
"%%ENV:SVC_CONF_ENV_PORT_RW%%": strconv.Itoa(proxy.WritePort),
"%%ENV:SVC_CONF_ENV_MAXSCALE_MAXINFO_PORT%%": strconv.Itoa(proxy.ClusterGroup.Conf.MxsMaxinfoPort),
"%%ENV:SVC_CONF_ENV_PORT_RW_SPLIT%%": strconv.Itoa(proxy.ReadWritePort),
"%%ENV:SVC_CONF_ENV_PORT_BINLOG%%": strconv.Itoa(proxy.ClusterGroup.Conf.MxsBinlogPort),
"%%ENV:SVC_CONF_ENV_PORT_TELNET%%": proxy.Port,
"%%ENV:SVC_CONF_ENV_PORT_ADMIN%%": proxy.Port,
"%%ENV:SVC_CONF_ENV_USER_ADMIN%%": proxy.User,
"%%ENV:SVC_CONF_ENV_PASSWORD_ADMIN%%": proxy.Pass,
"%%ENV:SVC_CONF_ENV_SPHINX_MEM%%": proxy.ClusterGroup.Conf.ProvSphinxMem,
"%%ENV:SVC_CONF_ENV_SPHINX_MAX_CHILDREN%%": proxy.ClusterGroup.Conf.ProvSphinxMaxChildren,
"%%ENV:SVC_CONF_ENV_VIP_ADDR%%": proxy.ClusterGroup.Conf.ProvProxRouteAddr,
"%%ENV:SVC_CONF_ENV_VIP_NETMASK%%": proxy.ClusterGroup.Conf.ProvProxRouteMask,
"%%ENV:SVC_CONF_ENV_VIP_PORT%%": proxy.ClusterGroup.Conf.ProvProxRoutePort,
"%%ENV:SVC_CONF_ENV_MRM_API_ADDR%%": proxy.ClusterGroup.Conf.MonitorAddress + ":" + proxy.ClusterGroup.Conf.HttpPort,
"%%ENV:SVC_CONF_ENV_MRM_CLUSTER_NAME%%": proxy.ClusterGroup.GetClusterName(),
"%%ENV:SVC_CONF_ENV_DATADIR%%": proxy.GetConfigDatadir(),
"%%ENV:SVC_CONF_ENV_CONFDIR%%": proxy.GetConfigConfigdir(),
"%%ENV:SVC_CONF_ENV_PROXYSQL_READ_ON_MASTER%%": proxy.GetConfigProxySQLReadOnMaster(),
"%%ENV:NODES_CPU_CORES%%": proxy.ClusterGroup.Conf.ProvCores,
"%%ENV:SVC_CONF_ENV_MAX_CORES%%": proxy.ClusterGroup.Conf.ProvCores,
"%%ENV:SVC_CONF_ENV_CRC32_ID%%": string(proxy.Id[2:10]),
"%%ENV:SVC_CONF_ENV_SERVER_ID%%": string(proxy.Id[2:10]),
"%%ENV:SVC_CONF_ENV_MYSQL_ROOT_PASSWORD%%": proxy.ClusterGroup.dbPass,
"%%ENV:SVC_CONF_ENV_MYSQL_ROOT_USER%%": proxy.ClusterGroup.dbUser,
"%%ENV:SERVER_IP%%": proxy.GetBindAddress(),
"%%ENV:EXTRA_BIND_SERVER_IPV6%%": proxy.GetBindAddressExtraIPV6(),
"%%ENV:SVC_CONF_ENV_PROXY_USE_SSL%%": proxy.GetUseSSL(),
"%%ENV:CAUSAL_READ%%": proxy.GetCausalRead(),
"%%ENV:SVC_CONF_ENV_PROXY_USE_COMPRESS%%": proxy.GetUseCompression(),
"%%ENV:SERVER_PORT%%": proxy.Port,
"%%ENV:SVC_NAMESPACE%%": proxy.ClusterGroup.Name,
"%%ENV:SVC_NAME%%": proxy.Name,
"%%ENV:SERVERS_HAPROXY_WRITE%%": proxy.GetConfigProxyModule("%%ENV:SERVERS_HAPROXY_WRITE%%"),
"%%ENV:SERVERS_HAPROXY_READ%%": proxy.GetConfigProxyModule("%%ENV:SERVERS_HAPROXY_READ%%"),
"%%ENV:SERVERS_HAPROXY_WRITE_BACKEND%%": proxy.ClusterGroup.Conf.HaproxyAPIWriteBackend,
"%%ENV:SERVERS_HAPROXY_READ_BACKEND%%": proxy.ClusterGroup.Conf.HaproxyAPIReadBackend,
"%%ENV:SVC_CONF_HAPROXY_DNS%%": proxy.GetConfigProxyDNS(),
"%%ENV:SERVERS_PROXYSQL%%": proxy.GetConfigProxyModule("%%ENV:SERVERS_PROXYSQL%%"),
"%%ENV:SERVERS%%": proxy.GetConfigProxyModule("%%ENV:SERVERS%%"),
"%%ENV:SERVERS_LIST%%": proxy.GetConfigProxyModule("%%ENV:SERVERS_LIST%%"),
"%%ENV:SVC_CONF_ENV_PORT_HTTP%%": "80",
"%%ENV:SVC_CONF_ENV_PORT_R_LB%%": strconv.Itoa(proxy.ReadPort),
"%%ENV:SVC_CONF_ENV_PORT_RW%%": strconv.Itoa(proxy.WritePort),
"%%ENV:SVC_CONF_ENV_MAXSCALE_MAXINFO_PORT%%": strconv.Itoa(proxy.ClusterGroup.Conf.MxsMaxinfoPort),
"%%ENV:SVC_CONF_ENV_PORT_RW_SPLIT%%": strconv.Itoa(proxy.ReadWritePort),
"%%ENV:SVC_CONF_ENV_PORT_BINLOG%%": strconv.Itoa(proxy.ClusterGroup.Conf.MxsBinlogPort),
"%%ENV:SVC_CONF_ENV_PORT_TELNET%%": proxy.Port,
"%%ENV:SVC_CONF_ENV_PORT_ADMIN%%": proxy.Port,
"%%ENV:SVC_CONF_ENV_USER_ADMIN%%": proxy.User,
"%%ENV:SVC_CONF_ENV_PASSWORD_ADMIN%%": proxy.Pass,
"%%ENV:SVC_CONF_ENV_SPHINX_MEM%%": proxy.ClusterGroup.Conf.ProvSphinxMem,
"%%ENV:SVC_CONF_ENV_SPHINX_MAX_CHILDREN%%": proxy.ClusterGroup.Conf.ProvSphinxMaxChildren,
"%%ENV:SVC_CONF_ENV_VIP_ADDR%%": proxy.ClusterGroup.Conf.ProvProxRouteAddr,
"%%ENV:SVC_CONF_ENV_VIP_NETMASK%%": proxy.ClusterGroup.Conf.ProvProxRouteMask,
"%%ENV:SVC_CONF_ENV_VIP_PORT%%": proxy.ClusterGroup.Conf.ProvProxRoutePort,
"%%ENV:SVC_CONF_ENV_MRM_API_ADDR%%": proxy.ClusterGroup.Conf.MonitorAddress + ":" + proxy.ClusterGroup.Conf.HttpPort,
"%%ENV:SVC_CONF_ENV_MRM_CLUSTER_NAME%%": proxy.ClusterGroup.GetClusterName(),
"%%ENV:SVC_CONF_ENV_DATADIR%%": proxy.GetConfigDatadir(),
"%%ENV:SVC_CONF_ENV_CONFDIR%%": proxy.GetConfigConfigdir(),
"%%ENV:SVC_CONF_ENV_PROXYSQL_READ_ON_MASTER%%": proxy.GetConfigProxySQLReadOnMaster(),
"%%ENV:SVC_CONF_ENV_PROXYSQL_READER_HOSTGROUP%%": proxy.GetConfigProxySQLReaderHostgroup(),
"%%ENV:SVC_CONF_ENV_PROXYSQL_WRITER_HOSTGROUP%%": proxy.GetConfigProxySQLWriterHostgroup(),
}
}

Expand All @@ -218,6 +220,14 @@ func (proxy *Proxy) GetConfigProxySQLReadOnMaster() string {
return "0"
}

func (proxy *Proxy) GetConfigProxySQLReaderHostgroup() string {
return strconv.Itoa(proxy.ReaderHostgroup)
}

func (proxy *Proxy) GetConfigProxySQLWriterHostgroup() string {
return strconv.Itoa(proxy.WriterHostgroup)
}

func (proxy *Proxy) GetConfigProxyDNS() string {
if proxy.HasDNS() {
return `
Expand Down
2 changes: 1 addition & 1 deletion cluster/srv_rejoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (server *ServerMonitor) rejoinSlave(ss dbhelper.SlaveStatus) error {
logs, err := mycurrentmaster.StopSlave()
server.ClusterGroup.LogSQL(logs, err, mycurrentmaster.URL, "Rejoin", LvlErr, "Failed to stop slave on relay server %s: %s", mycurrentmaster.URL, err)
if err == nil {
logs, err2 := dbhelper.MasterPosWait(server.Conn, mycurrentmaster.BinaryLogFile, mycurrentmaster.BinaryLogPos, 3600)
logs, err2 := dbhelper.MasterPosWait(server.Conn, server.DBVersion, mycurrentmaster.BinaryLogFile, mycurrentmaster.BinaryLogPos, 3600, server.ClusterGroup.Conf.MasterConn)
server.ClusterGroup.LogSQL(logs, err2, server.URL, "Rejoin", LvlErr, "Failed positional rejoin wait pos %s %s", server.URL, err2)
if err2 == nil {
myparentss, _ := mycurrentmaster.GetSlaveStatus(mycurrentmaster.ReplicationSourceName)
Expand Down
2 changes: 1 addition & 1 deletion cluster/srv_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (server *ServerMonitor) WaitSyncToMaster(master *ServerMonitor) {
server.ClusterGroup.LogSQL(logs, err, server.URL, "MasterFailover", LvlErr, "Failed MasterWaitGTID, %s", err)

} else {
logs, err := dbhelper.MasterPosWait(server.Conn, master.BinaryLogFile, master.BinaryLogPos, 30)
logs, err := dbhelper.MasterPosWait(server.Conn, server.DBVersion, master.BinaryLogFile, master.BinaryLogPos, 30, server.ClusterGroup.Conf.MasterConn)
server.ClusterGroup.LogSQL(logs, err, server.URL, "MasterFailover", LvlErr, "Failed MasterPosWait, %s", err)
}

Expand Down
45 changes: 39 additions & 6 deletions utils/dbhelper/dbhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type MasterStatus struct {

type SlaveStatus struct {
ConnectionName sql.NullString `db:"Connection_name" json:"connectionName"`
ChannelName sql.NullString `db:"Channel_Name" json:"channelName"`
MasterHost sql.NullString `db:"Master_Host" json:"masterHost"`
MasterUser sql.NullString `db:"Master_User" json:"masterUser"`
MasterPort sql.NullString `db:"Master_Port" json:"masterPort"`
Expand Down Expand Up @@ -870,6 +871,13 @@ func GetSlaveStatus(db *sqlx.DB, Channel string, myver *MySQLVersion) (SlaveStat
err = udb.Get(&ss, query)
}
}
//
if ss.ChannelName.Valid {
if ss.ChannelName.String != "" {
ss.ConnectionName.String = ss.ChannelName.String
ss.ConnectionName.Valid = true
}
}

return ss, query, err
}
Expand All @@ -879,7 +887,20 @@ func GetChannelSlaveStatus(db *sqlx.DB, myver *MySQLVersion) ([]SlaveStatus, str
udb := db.Unsafe()
ss := []SlaveStatus{}
err := udb.Select(&ss, "SHOW SLAVE STATUS")
return ss, "SHOW SLAVE STATUS", err
// Unified MariaDB MySQL ConnectionName and ChannelName
uniss := []SlaveStatus{}
if err == nil {
for _, s := range ss {
if s.ChannelName.Valid {
if s.ChannelName.String != "" {
s.ConnectionName.String = s.ChannelName.String
s.ConnectionName.Valid = true
}
}
uniss = append(uniss, s)
}
}
return uniss, "SHOW SLAVE STATUS", err
}

func GetPGSlaveStatus(db *sqlx.DB, myver *MySQLVersion) ([]SlaveStatus, error) {
Expand All @@ -904,6 +925,7 @@ func GetPGSlaveStatus(db *sqlx.DB, myver *MySQLVersion) ([]SlaveStatus, error) {
ORDER BY pid ASC`

err := udb.Select(&ss, query)

return ss, err
}

Expand Down Expand Up @@ -1014,7 +1036,6 @@ func GetAllSlavesStatus(db *sqlx.DB, myver *MySQLVersion) ([]SlaveStatus, string
(SELECT count(*) as nbrep FROM pg_stat_subscription) AS sqt `
}
err = udb.Select(&ss, query)

return ss, query, err
}

Expand Down Expand Up @@ -2214,10 +2235,22 @@ func MasterWaitGTID(db *sqlx.DB, gtid string, timeout int) (string, error) {
return query + "(" + gtid + "-" + strconv.Itoa(timeout) + ")", err
}

func MasterPosWait(db *sqlx.DB, log string, pos string, timeout int) (string, error) {
query := "SELECT MASTER_POS_WAIT(?, ?, ?)"
_, err := db.Exec(query, log, pos, timeout)
return query + "(" + log + "-" + pos + "-" + strconv.Itoa(timeout) + ")", err
func MasterPosWait(db *sqlx.DB, myver *MySQLVersion, log string, pos string, timeout int, channel string) (string, error) {
// SOURCE_POS_WAIT before MySQL 8.0.26
funcname := "MASTER_POS_WAIT"
if (myver.Major >= 8 && myver.Minor > 0) || (myver.Major >= 8 && myver.Minor == 0 && myver.Release >= 26) {
funcname = "SOURCE_POS_WAIT"
}

if channel == "" {
query := "SELECT " + funcname + "(?, ?, ?)"
_, err := db.Exec(query, log, pos, timeout)
return query + "(" + log + "-" + pos + "-" + strconv.Itoa(timeout) + ")", err
} else {
query := "SELECT " + funcname + "(?, ?, ?, ?)"
_, err := db.Exec(query, log, pos, timeout, channel)
return query + "(" + log + "-" + pos + "-" + strconv.Itoa(timeout) + ")", err
}
}

func SetReadOnly(db *sqlx.DB, flag bool) (string, error) {
Expand Down

0 comments on commit 695a5a4

Please sign in to comment.