Skip to content

Commit

Permalink
failover-semisync-state will toogle semisync leader and replica state…
Browse files Browse the repository at this point in the history
… during failover, rejoin, switchover. It's not enable per default #453

autorejoin-script is now executed what ever rejoin method used by replication
backup-save-script and backup-load-script can be used to shortcut replication-manager backup restore features parameters host, leader_host, port, leader_port
  • Loading branch information
svaroqui committed Oct 20, 2022
1 parent f646755 commit 6e25bbe
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (cluster *Cluster) StateProcessing() {
}
}
if s.ErrKey == "WARN0101" {
cluster.LogPrintf(LvlInfo, "Cluster have backup")
cluster.LogPrintf(LvlInfo, "Cluster have backup")
for _, srv := range cluster.Servers {
if srv.HasWaitBackupCookie() {
cluster.LogPrintf(LvlInfo, "Server %s was waiting for backup", srv.URL)
Expand Down
5 changes: 5 additions & 0 deletions cluster/cluster_fail.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ func (cluster *Cluster) MasterFailover(fail bool) bool {
logs, err := cluster.master.StopSlave()
cluster.LogSQL(logs, err, cluster.master.URL, "MasterFailover", LvlErr, "Failed stopping slave on new master %s %s", cluster.master.URL, err)
}
if cluster.master.ClusterGroup.Conf.FailoverSemiSyncState {
cluster.LogPrintf("INFO", "Enable semisync leader and disable semisync replica on %s", cluster.master.URL)
logs, err := cluster.master.SetSemiSyncLeader()
cluster.LogSQL(logs, err, cluster.master.URL, "Rejoin", LvlErr, "Failed enable semisync leader and disable semisync replica on %s %s", cluster.master.URL, err)
}
}
cluster.Crashes = append(cluster.Crashes, crash)
t := time.Now()
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (cluster *Cluster) SetMonitoringAddress(value string) error {
}

func (cluster *Cluster) SetSchedulerDbServersLogicalBackupCron(value string) error {
cluster.Conf.BackupPhysicalCron = value
cluster.Conf.BackupLogicalCron = value
cluster.SetSchedulerBackupLogical()
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions cluster/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,12 @@ func (server *ServerMonitor) freeze() bool {
logs, err = dbhelper.FlushBinaryLogs(server.Conn)
server.ClusterGroup.LogSQL(logs, err, server.URL, "MasterFailover", LvlErr, "Could not flush binary logs on %s", server.URL)

if server.ClusterGroup.Conf.FailoverSemiSyncState {
server.ClusterGroup.LogPrintf("INFO", "Set semisync replica and disable semisync leader %s", server.URL)
logs, err := server.SetSemiSyncReplica()
server.ClusterGroup.LogSQL(logs, err, server.URL, "Rejoin", LvlErr, "Failed Set semisync replica and disable semisync %s, %s", server.URL, err)
}

return true
}

Expand Down
59 changes: 57 additions & 2 deletions cluster/srv_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ func (server *ServerMonitor) JobFlashbackLogicalBackup() (int64, error) {
}

server.ClusterGroup.LogPrintf(LvlInfo, "Receive reseed logical backup %s request for server: %s", server.ClusterGroup.Conf.BackupPhysicalType, server.URL)
if server.ClusterGroup.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper {
if server.ClusterGroup.Conf.BackupLoadScript != "" {
go server.JobReseedBackupScript()
} else if server.ClusterGroup.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper {
go server.JobReseedMyLoader()
}
return jobid, err
Expand Down Expand Up @@ -418,6 +420,34 @@ func (server *ServerMonitor) JobReseedMyLoader() {

}

func (server *ServerMonitor) JobReseedBackupScript() {

cmd := exec.Command(server.ClusterGroup.Conf.BackupLoadScript, misc.Unbracket(server.Host), misc.Unbracket(server.ClusterGroup.master.Host))

server.ClusterGroup.LogPrintf(LvlInfo, "Command backup load script: %s", strings.Replace(cmd.String(), server.ClusterGroup.dbPass, "XXXX", 1))

stdoutIn, _ := cmd.StdoutPipe()
stderrIn, _ := cmd.StderrPipe()
cmd.Start()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
server.copyLogs(stdoutIn)
}()
go func() {
defer wg.Done()
server.copyLogs(stderrIn)
}()
wg.Wait()
if err := cmd.Wait(); err != nil {
server.ClusterGroup.LogPrintf(LvlErr, "My reload script: %s", err)
return
}
server.ClusterGroup.LogPrintf(LvlInfo, "Finish logical restaure from load script on %s ", server.URL)

}

func (server *ServerMonitor) JobMyLoaderParseMeta(dir string) (config.MyDumperMetaData, error) {

var m config.MyDumperMetaData
Expand Down Expand Up @@ -615,7 +645,31 @@ func (server *ServerMonitor) JobBackupLogical() error {
server.ClusterGroup.LogSQL("BACKUP BLOCK_DDL", err, server.URL, "JobBackupLogical", LvlErr, "Failed SQL for server %s: %s ", server.URL, err)
server.ClusterGroup.LogPrintf(LvlInfo, "Blocking DDL via BACKUP STAGE")
}

if server.ClusterGroup.Conf.BackupSaveScript != "" {
scriptCmd := exec.Command(server.ClusterGroup.Conf.BackupSaveScript, server.Host, server.GetCluster().GetMaster().Host, server.Port, server.GetCluster().GetMaster().Port)
server.ClusterGroup.LogPrintf(LvlInfo, "Command: %s", strings.Replace(scriptCmd.String(), server.ClusterGroup.dbPass, "XXXX", 1))
stdoutIn, _ := scriptCmd.StdoutPipe()
stderrIn, _ := scriptCmd.StderrPipe()
scriptCmd.Start()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
server.copyLogs(stdoutIn)
}()
go func() {
defer wg.Done()
server.copyLogs(stderrIn)
}()
wg.Wait()
if err := scriptCmd.Wait(); err != nil {
server.ClusterGroup.LogPrintf(LvlErr, "Backup script error: %s", err)
return err
} else {
server.SetBackupLogicalCookie()
}
return nil
}
if server.ClusterGroup.Conf.BackupLogicalType == config.ConstBackupLogicalTypeRiver {
cfg := new(river.Config)
cfg.MyHost = server.URL
Expand Down Expand Up @@ -736,6 +790,7 @@ func (server *ServerMonitor) JobBackupLogical() error {
server.ClusterGroup.LogPrintf(LvlErr, "Dumpling %s", err)

}

if server.ClusterGroup.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper {
// --no-schemas --regex '^(?!(mysql))'

Expand Down
40 changes: 27 additions & 13 deletions cluster/srv_rejoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func (server *ServerMonitor) RejoinMaster() error {
if server.ClusterGroup.master != nil {
if server.URL != server.ClusterGroup.master.URL {
server.ClusterGroup.SetState("WARN0022", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0022"], server.URL, server.ClusterGroup.master.URL), ErrFrom: "REJOIN"})
server.RejoinScript()
if server.ClusterGroup.Conf.FailoverSemiSyncState {
server.ClusterGroup.LogPrintf("INFO", "Set semisync replica and disable semisync leader %s", server.URL)
logs, err := server.SetSemiSyncReplica()
server.ClusterGroup.LogSQL(logs, err, server.URL, "Rejoin", LvlErr, "Failed Set semisync replica and disable semisync %s, %s", server.URL, err)
}
crash := server.ClusterGroup.getCrashFromJoiner(server.URL)
if crash == nil {
server.ClusterGroup.SetState("ERR00066", state.State{ErrType: "ERROR", ErrDesc: fmt.Sprintf(clusterError["ERR00066"], server.URL, server.ClusterGroup.master.URL), ErrFrom: "REJOIN"})
Expand Down Expand Up @@ -135,14 +141,14 @@ func (server *ServerMonitor) RejoinMasterSST() error {
server.JobFlashbackPhysicalBackup()
} else if server.ClusterGroup.Conf.AutorejoinZFSFlashback {
server.RejoinPreviousSnapshot()
} else if server.ClusterGroup.Conf.RejoinScript != "" {
server.ClusterGroup.LogPrintf("INFO", "Calling rejoin flashback script")
} else if server.ClusterGroup.Conf.BackupLoadScript != "" {
server.ClusterGroup.LogPrintf("INFO", "Calling restore script")
var out []byte
out, err := exec.Command(server.ClusterGroup.Conf.RejoinScript, misc.Unbracket(server.Host), misc.Unbracket(server.ClusterGroup.master.Host)).CombinedOutput()
out, err := exec.Command(server.ClusterGroup.Conf.BackupLoadScript, misc.Unbracket(server.Host), misc.Unbracket(server.ClusterGroup.master.Host), server.Port, server.GetCluster().GetMaster().Port).CombinedOutput()
if err != nil {
server.ClusterGroup.LogPrintf("ERROR", "%s", err)
}
server.ClusterGroup.LogPrintf("INFO", "Rejoin script complete %s", string(out))
server.ClusterGroup.LogPrintf("INFO", "Restore script complete %s", string(out))
} else {
server.ClusterGroup.LogPrintf("INFO", "No SST rejoin method found")
return errors.New("No SST rejoin flashback method found")
Expand All @@ -151,6 +157,20 @@ func (server *ServerMonitor) RejoinMasterSST() error {
return nil
}

func (server *ServerMonitor) RejoinScript() {
// Call pre-rejoin script
if server.GetCluster().Conf.RejoinScript != "" {
server.ClusterGroup.LogPrintf("INFO", "Calling rejoin script")
var out []byte
var err error
out, err = exec.Command(server.ClusterGroup.Conf.RejoinScript, server.Host, server.GetCluster().GetMaster().Host, server.Port, server.GetCluster().GetMaster().Port).CombinedOutput()
if err != nil {
server.ClusterGroup.LogPrintf(LvlErr, "%s", err)
}
server.ClusterGroup.LogPrintf(LvlInfo, "Rejoin script complete:", string(out))
}
}

func (server *ServerMonitor) ReseedMasterSST() error {
server.DelWaitBackupCookie()
if server.ClusterGroup.Conf.AutorejoinMysqldump == true {
Expand All @@ -161,18 +181,12 @@ func (server *ServerMonitor) ReseedMasterSST() error {
return errors.New("Dump from master failed")
}
} else {
if server.ClusterGroup.Conf.AutorejoinLogicalBackup {
if server.ClusterGroup.Conf.BackupLoadScript != "" {
server.JobReseedBackupScript()
} else if server.ClusterGroup.Conf.AutorejoinLogicalBackup {
server.JobReseedLogicalBackup()
} else if server.ClusterGroup.Conf.AutorejoinPhysicalBackup {
server.JobReseedPhysicalBackup()
} else if server.ClusterGroup.Conf.RejoinScript != "" {
server.ClusterGroup.LogPrintf("INFO", "Calling rejoin script")
var out []byte
out, err := exec.Command(server.ClusterGroup.Conf.RejoinScript, misc.Unbracket(server.Host), misc.Unbracket(server.ClusterGroup.master.Host)).CombinedOutput()
if err != nil {
server.ClusterGroup.LogPrintf("ERROR", "%s", err)
}
server.ClusterGroup.LogPrintf("INFO", "Rejoin script complete %s", string(out))
} else {
server.ClusterGroup.LogPrintf("INFO", "No SST reseed method found")
return errors.New("No SST reseed method found")
Expand Down
22 changes: 22 additions & 0 deletions cluster/srv_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,28 @@ func (server *ServerMonitor) SetPreferedBackup(pref bool) {
server.PreferedBackup = pref
}

func (server *ServerMonitor) SetSemiSyncReplica() (string, error) {
logs := ""
if !server.IsSemiSyncReplica() {
logs, err := dbhelper.SetSemiSyncSlave(server.Conn, server.DBVersion)
if err != nil {
return logs, err
}
}
return logs, nil
}

func (server *ServerMonitor) SetSemiSyncLeader() (string, error) {
logs := ""
if !server.IsSemiSyncMaster() {
logs, err := dbhelper.SetSemiSyncMaster(server.Conn, server.DBVersion)
if err != nil {
return logs, err
}
}
return logs, nil
}

func (server *ServerMonitor) SetReadOnly() (string, error) {
logs := ""
if !server.IsReadOnly() {
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type Config struct {
PreScript string `mapstructure:"failover-pre-script" toml:"failover-pre-script" json:"failoverPreScript"`
PostScript string `mapstructure:"failover-post-script" toml:"failover-post-script" json:"failoverPostScript"`
ReadOnly bool `mapstructure:"failover-readonly-state" toml:"failover-readonly-state" json:"failoverReadOnlyState"`
FailoverSemiSyncState bool `mapstructure:"failover-semisync-state" toml:"failover-semisync-state" json:"failoverSemisyncState"`
SuperReadOnly bool `mapstructure:"failover-superreadonly-state" toml:"failover-superreadonly-state" json:"failoverSuperReadOnlyState"`
FailTime int64 `mapstructure:"failover-time-limit" toml:"failover-time-limit" json:"failoverTimeLimit"`
FailSync bool `mapstructure:"failover-at-sync" toml:"failover-at-sync" json:"failoverAtSync"`
Expand Down Expand Up @@ -462,6 +463,8 @@ type Config struct {
BackupPhysicalCron string `mapstructure:"scheduler-db-servers-physical-backup-cron" toml:"scheduler-db-servers-physical-backup-cron" json:"schedulerDbServersPhysicalBackupCron"`
BackupDatabaseLogCron string `mapstructure:"scheduler-db-servers-logs-cron" toml:"scheduler-db-servers-logs-cron" json:"schedulerDbServersLogsCron"`
BackupDatabaseOptimizeCron string `mapstructure:"scheduler-db-servers-optimize-cron" toml:"scheduler-db-servers-optimize-cron" json:"schedulerDbServersOptimizeCron"`
BackupSaveScript string `mapstructure:"backup-save-script" toml:"backup-save-script" json:"backupSaveScript"`
BackupLoadScript string `mapstructure:"backup-load-script" toml:"backup-load-script" json:"backupLoadScript"`
SchedulerDatabaseLogsTableRotate bool `mapstructure:"scheduler-db-servers-logs-table-rotate" toml:"scheduler-db-servers-logs-table-rotate" json:"schedulerDbServersLogsTableRotate"`
SchedulerDatabaseLogsTableRotateCron string `mapstructure:"scheduler-db-servers-logs-table-rotate-cron" toml:"scheduler-db-servers-logs-table-rotate-cron" json:"schedulerDbServersLogsTableRotateCron"`
SchedulerMaintenanceDatabaseLogsTableKeep int `mapstructure:"scheduler-db-servers-logs-table-keep" toml:"scheduler-db-servers-logs-table-keep" json:"schedulerDatabaseLogsTableKeep"`
Expand Down
4 changes: 4 additions & 0 deletions server/server_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func init() {
monitorCmd.Flags().StringVar(&conf.PreScript, "failover-pre-script", "", "Path of pre-failover script")
monitorCmd.Flags().StringVar(&conf.PostScript, "failover-post-script", "", "Path of post-failover script")
monitorCmd.Flags().BoolVar(&conf.ReadOnly, "failover-readonly-state", true, "Failover Switchover set slaves as read-only")
monitorCmd.Flags().BoolVar(&conf.FailoverSemiSyncState, "failover-semisync-state", false, "Failover Switchover set semisync slave master state")
monitorCmd.Flags().BoolVar(&conf.SuperReadOnly, "failover-superreadonly-state", false, "Failover Switchover set slaves as super-read-only")
monitorCmd.Flags().StringVar(&conf.FailMode, "failover-mode", "manual", "Failover is manual or automatic")
monitorCmd.Flags().Int64Var(&conf.FailMaxDelay, "failover-max-slave-delay", 30, "Election ignore slave with replication delay over this time in sec")
Expand Down Expand Up @@ -366,6 +367,9 @@ func init() {
monitorCmd.Flags().IntVar(&conf.BackupKeepMonthly, "backup-keep-monthly", 12, "Keep this number of monthly backup")
monitorCmd.Flags().IntVar(&conf.BackupKeepYearly, "backup-keep-yearly", 2, "Keep this number of yearly backup")

monitorCmd.Flags().StringVar(&conf.BackupSaveScript, "backup-save-script", "", "Customized backup save script")
monitorCmd.Flags().StringVar(&conf.BackupLoadScript, "backup-load-script", "", "Customized backup load script")

monitorCmd.Flags().StringVar(&conf.BackupMyDumperPath, "backup-mydumper-path", "/usr/bin/mydumper", "Path to mydumper binary")
monitorCmd.Flags().StringVar(&conf.BackupMyLoaderPath, "backup-myloader-path", "/usr/bin/myloader", "Path to myloader binary")
monitorCmd.Flags().StringVar(&conf.BackupMyLoaderOptions, "backup-myloader-options", "--overwrite-tables --enable-binlog --verbose=3", "Extra options")
Expand Down
36 changes: 36 additions & 0 deletions utils/dbhelper/dbhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1957,6 +1957,42 @@ func SetMaxConnections(db *sqlx.DB, connections string, myver *MySQLVersion) (st
return query, err
}

func SetSemiSyncSlave(db *sqlx.DB, myver *MySQLVersion) (string, error) {

query := "SET GLOBAL rpl-semi-sync-slave-enabled=1"
if myver.IsMySQL() && ((myver.Major >= 8 && myver.Minor > 0) || (myver.Major >= 8 && myver.Minor == 0 && myver.Release >= 26)) {
query = "SET GLOBAL rpl_semi_sync_replica_enabled=1"
}
_, err := db.Exec(query)
if err != nil {
return query, err
}
query = "SET GLOBAL rpl-semi-sync-master-enabled=0"
if myver.IsMySQL() && ((myver.Major >= 8 && myver.Minor > 0) || (myver.Major >= 8 && myver.Minor == 0 && myver.Release >= 26)) {
query = "SET GLOBAL rpl_semi_sync_source_enabled=0"
}
_, err = db.Exec(query)
return query, err
}

func SetSemiSyncMaster(db *sqlx.DB, myver *MySQLVersion) (string, error) {

query := "SET GLOBAL rpl-semi-sync-master-enabled=1"
if myver.IsMySQL() && ((myver.Major >= 8 && myver.Minor > 0) || (myver.Major >= 8 && myver.Minor == 0 && myver.Release >= 26)) {
query = "SET GLOBAL rpl_semi_sync_source_enabled=1"
}
_, err := db.Exec(query)
if err != nil {
return query, err
}
query = "SET GLOBAL rpl-semi-sync-slave-enabled=0"
if myver.IsMySQL() && ((myver.Major >= 8 && myver.Minor > 0) || (myver.Major >= 8 && myver.Minor == 0 && myver.Release >= 26)) {
query = "SET GLOBAL rpl_semi_sync_replica_enabled=0"
}
_, err = db.Exec(query)
return query, err
}

func SetSlaveGTIDModeStrict(db *sqlx.DB, myver *MySQLVersion) (string, error) {
var err error
stmt := ""
Expand Down

0 comments on commit 6e25bbe

Please sign in to comment.