From 6e25bbefc7e37211efd2ba1f0118b1422885f1a4 Mon Sep 17 00:00:00 2001 From: apple Date: Thu, 20 Oct 2022 16:48:37 +0200 Subject: [PATCH] failover-semisync-state will toogle semisync leader and replica state during failover, rejoin, switchover. It's not enable per default #453 autorejoin-script is now executed what ever rejoin method used by replication backup-save-script and backup-load-script can be used to shortcut replication-manager backup restore features parameters host, leader_host, port, leader_port --- cluster/cluster.go | 2 +- cluster/cluster_fail.go | 5 ++++ cluster/cluster_set.go | 2 +- cluster/srv.go | 6 ++++ cluster/srv_job.go | 59 ++++++++++++++++++++++++++++++++++++-- cluster/srv_rejoin.go | 40 +++++++++++++++++--------- cluster/srv_set.go | 22 ++++++++++++++ config/config.go | 3 ++ server/server_monitor.go | 4 +++ utils/dbhelper/dbhelper.go | 36 +++++++++++++++++++++++ 10 files changed, 162 insertions(+), 17 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index dce046dd5..b0675cf5d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -550,7 +550,7 @@ func (cluster *Cluster) StateProcessing() { } } if s.ErrKey == "WARN0101" { - cluster.LogPrintf(LvlInfo, "Cluster have backup") + cluster.LogPrintf(LvlInfo, "Cluster have backup") for _, srv := range cluster.Servers { if srv.HasWaitBackupCookie() { cluster.LogPrintf(LvlInfo, "Server %s was waiting for backup", srv.URL) diff --git a/cluster/cluster_fail.go b/cluster/cluster_fail.go index 537f91655..7fe56f042 100644 --- a/cluster/cluster_fail.go +++ b/cluster/cluster_fail.go @@ -215,6 +215,11 @@ func (cluster *Cluster) MasterFailover(fail bool) bool { logs, err := cluster.master.StopSlave() cluster.LogSQL(logs, err, cluster.master.URL, "MasterFailover", LvlErr, "Failed stopping slave on new master %s %s", cluster.master.URL, err) } + if cluster.master.ClusterGroup.Conf.FailoverSemiSyncState { + cluster.LogPrintf("INFO", "Enable semisync leader and disable semisync replica on %s", cluster.master.URL) + logs, err := cluster.master.SetSemiSyncLeader() + cluster.LogSQL(logs, err, cluster.master.URL, "Rejoin", LvlErr, "Failed enable semisync leader and disable semisync replica on %s %s", cluster.master.URL, err) + } } cluster.Crashes = append(cluster.Crashes, crash) t := time.Now() diff --git a/cluster/cluster_set.go b/cluster/cluster_set.go index 088480068..a5c942eb9 100644 --- a/cluster/cluster_set.go +++ b/cluster/cluster_set.go @@ -861,7 +861,7 @@ func (cluster *Cluster) SetMonitoringAddress(value string) error { } func (cluster *Cluster) SetSchedulerDbServersLogicalBackupCron(value string) error { - cluster.Conf.BackupPhysicalCron = value + cluster.Conf.BackupLogicalCron = value cluster.SetSchedulerBackupLogical() return nil } diff --git a/cluster/srv.go b/cluster/srv.go index 9779b97f8..df45de354 100644 --- a/cluster/srv.go +++ b/cluster/srv.go @@ -911,6 +911,12 @@ func (server *ServerMonitor) freeze() bool { logs, err = dbhelper.FlushBinaryLogs(server.Conn) server.ClusterGroup.LogSQL(logs, err, server.URL, "MasterFailover", LvlErr, "Could not flush binary logs on %s", server.URL) + if server.ClusterGroup.Conf.FailoverSemiSyncState { + server.ClusterGroup.LogPrintf("INFO", "Set semisync replica and disable semisync leader %s", server.URL) + logs, err := server.SetSemiSyncReplica() + server.ClusterGroup.LogSQL(logs, err, server.URL, "Rejoin", LvlErr, "Failed Set semisync replica and disable semisync %s, %s", server.URL, err) + } + return true } diff --git a/cluster/srv_job.go b/cluster/srv_job.go index 84b22efc0..3df0d7712 100644 --- a/cluster/srv_job.go +++ b/cluster/srv_job.go @@ -277,7 +277,9 @@ func (server *ServerMonitor) JobFlashbackLogicalBackup() (int64, error) { } server.ClusterGroup.LogPrintf(LvlInfo, "Receive reseed logical backup %s request for server: %s", server.ClusterGroup.Conf.BackupPhysicalType, server.URL) - if server.ClusterGroup.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper { + if server.ClusterGroup.Conf.BackupLoadScript != "" { + go server.JobReseedBackupScript() + } else if server.ClusterGroup.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper { go server.JobReseedMyLoader() } return jobid, err @@ -418,6 +420,34 @@ func (server *ServerMonitor) JobReseedMyLoader() { } +func (server *ServerMonitor) JobReseedBackupScript() { + + cmd := exec.Command(server.ClusterGroup.Conf.BackupLoadScript, misc.Unbracket(server.Host), misc.Unbracket(server.ClusterGroup.master.Host)) + + server.ClusterGroup.LogPrintf(LvlInfo, "Command backup load script: %s", strings.Replace(cmd.String(), server.ClusterGroup.dbPass, "XXXX", 1)) + + stdoutIn, _ := cmd.StdoutPipe() + stderrIn, _ := cmd.StderrPipe() + cmd.Start() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + server.copyLogs(stdoutIn) + }() + go func() { + defer wg.Done() + server.copyLogs(stderrIn) + }() + wg.Wait() + if err := cmd.Wait(); err != nil { + server.ClusterGroup.LogPrintf(LvlErr, "My reload script: %s", err) + return + } + server.ClusterGroup.LogPrintf(LvlInfo, "Finish logical restaure from load script on %s ", server.URL) + +} + func (server *ServerMonitor) JobMyLoaderParseMeta(dir string) (config.MyDumperMetaData, error) { var m config.MyDumperMetaData @@ -615,7 +645,31 @@ func (server *ServerMonitor) JobBackupLogical() error { server.ClusterGroup.LogSQL("BACKUP BLOCK_DDL", err, server.URL, "JobBackupLogical", LvlErr, "Failed SQL for server %s: %s ", server.URL, err) server.ClusterGroup.LogPrintf(LvlInfo, "Blocking DDL via BACKUP STAGE") } - + if server.ClusterGroup.Conf.BackupSaveScript != "" { + scriptCmd := exec.Command(server.ClusterGroup.Conf.BackupSaveScript, server.Host, server.GetCluster().GetMaster().Host, server.Port, server.GetCluster().GetMaster().Port) + server.ClusterGroup.LogPrintf(LvlInfo, "Command: %s", strings.Replace(scriptCmd.String(), server.ClusterGroup.dbPass, "XXXX", 1)) + stdoutIn, _ := scriptCmd.StdoutPipe() + stderrIn, _ := scriptCmd.StderrPipe() + scriptCmd.Start() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + server.copyLogs(stdoutIn) + }() + go func() { + defer wg.Done() + server.copyLogs(stderrIn) + }() + wg.Wait() + if err := scriptCmd.Wait(); err != nil { + server.ClusterGroup.LogPrintf(LvlErr, "Backup script error: %s", err) + return err + } else { + server.SetBackupLogicalCookie() + } + return nil + } if server.ClusterGroup.Conf.BackupLogicalType == config.ConstBackupLogicalTypeRiver { cfg := new(river.Config) cfg.MyHost = server.URL @@ -736,6 +790,7 @@ func (server *ServerMonitor) JobBackupLogical() error { server.ClusterGroup.LogPrintf(LvlErr, "Dumpling %s", err) } + if server.ClusterGroup.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper { // --no-schemas --regex '^(?!(mysql))' diff --git a/cluster/srv_rejoin.go b/cluster/srv_rejoin.go index 760edb4a0..57d505a62 100644 --- a/cluster/srv_rejoin.go +++ b/cluster/srv_rejoin.go @@ -55,6 +55,12 @@ func (server *ServerMonitor) RejoinMaster() error { if server.ClusterGroup.master != nil { if server.URL != server.ClusterGroup.master.URL { server.ClusterGroup.SetState("WARN0022", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0022"], server.URL, server.ClusterGroup.master.URL), ErrFrom: "REJOIN"}) + server.RejoinScript() + if server.ClusterGroup.Conf.FailoverSemiSyncState { + server.ClusterGroup.LogPrintf("INFO", "Set semisync replica and disable semisync leader %s", server.URL) + logs, err := server.SetSemiSyncReplica() + server.ClusterGroup.LogSQL(logs, err, server.URL, "Rejoin", LvlErr, "Failed Set semisync replica and disable semisync %s, %s", server.URL, err) + } crash := server.ClusterGroup.getCrashFromJoiner(server.URL) if crash == nil { server.ClusterGroup.SetState("ERR00066", state.State{ErrType: "ERROR", ErrDesc: fmt.Sprintf(clusterError["ERR00066"], server.URL, server.ClusterGroup.master.URL), ErrFrom: "REJOIN"}) @@ -135,14 +141,14 @@ func (server *ServerMonitor) RejoinMasterSST() error { server.JobFlashbackPhysicalBackup() } else if server.ClusterGroup.Conf.AutorejoinZFSFlashback { server.RejoinPreviousSnapshot() - } else if server.ClusterGroup.Conf.RejoinScript != "" { - server.ClusterGroup.LogPrintf("INFO", "Calling rejoin flashback script") + } else if server.ClusterGroup.Conf.BackupLoadScript != "" { + server.ClusterGroup.LogPrintf("INFO", "Calling restore script") var out []byte - out, err := exec.Command(server.ClusterGroup.Conf.RejoinScript, misc.Unbracket(server.Host), misc.Unbracket(server.ClusterGroup.master.Host)).CombinedOutput() + out, err := exec.Command(server.ClusterGroup.Conf.BackupLoadScript, misc.Unbracket(server.Host), misc.Unbracket(server.ClusterGroup.master.Host), server.Port, server.GetCluster().GetMaster().Port).CombinedOutput() if err != nil { server.ClusterGroup.LogPrintf("ERROR", "%s", err) } - server.ClusterGroup.LogPrintf("INFO", "Rejoin script complete %s", string(out)) + server.ClusterGroup.LogPrintf("INFO", "Restore script complete %s", string(out)) } else { server.ClusterGroup.LogPrintf("INFO", "No SST rejoin method found") return errors.New("No SST rejoin flashback method found") @@ -151,6 +157,20 @@ func (server *ServerMonitor) RejoinMasterSST() error { return nil } +func (server *ServerMonitor) RejoinScript() { + // Call pre-rejoin script + if server.GetCluster().Conf.RejoinScript != "" { + server.ClusterGroup.LogPrintf("INFO", "Calling rejoin script") + var out []byte + var err error + out, err = exec.Command(server.ClusterGroup.Conf.RejoinScript, server.Host, server.GetCluster().GetMaster().Host, server.Port, server.GetCluster().GetMaster().Port).CombinedOutput() + if err != nil { + server.ClusterGroup.LogPrintf(LvlErr, "%s", err) + } + server.ClusterGroup.LogPrintf(LvlInfo, "Rejoin script complete:", string(out)) + } +} + func (server *ServerMonitor) ReseedMasterSST() error { server.DelWaitBackupCookie() if server.ClusterGroup.Conf.AutorejoinMysqldump == true { @@ -161,18 +181,12 @@ func (server *ServerMonitor) ReseedMasterSST() error { return errors.New("Dump from master failed") } } else { - if server.ClusterGroup.Conf.AutorejoinLogicalBackup { + if server.ClusterGroup.Conf.BackupLoadScript != "" { + server.JobReseedBackupScript() + } else if server.ClusterGroup.Conf.AutorejoinLogicalBackup { server.JobReseedLogicalBackup() } else if server.ClusterGroup.Conf.AutorejoinPhysicalBackup { server.JobReseedPhysicalBackup() - } else if server.ClusterGroup.Conf.RejoinScript != "" { - server.ClusterGroup.LogPrintf("INFO", "Calling rejoin script") - var out []byte - out, err := exec.Command(server.ClusterGroup.Conf.RejoinScript, misc.Unbracket(server.Host), misc.Unbracket(server.ClusterGroup.master.Host)).CombinedOutput() - if err != nil { - server.ClusterGroup.LogPrintf("ERROR", "%s", err) - } - server.ClusterGroup.LogPrintf("INFO", "Rejoin script complete %s", string(out)) } else { server.ClusterGroup.LogPrintf("INFO", "No SST reseed method found") return errors.New("No SST reseed method found") diff --git a/cluster/srv_set.go b/cluster/srv_set.go index 36e835ae6..c859d0dd5 100644 --- a/cluster/srv_set.go +++ b/cluster/srv_set.go @@ -80,6 +80,28 @@ func (server *ServerMonitor) SetPreferedBackup(pref bool) { server.PreferedBackup = pref } +func (server *ServerMonitor) SetSemiSyncReplica() (string, error) { + logs := "" + if !server.IsSemiSyncReplica() { + logs, err := dbhelper.SetSemiSyncSlave(server.Conn, server.DBVersion) + if err != nil { + return logs, err + } + } + return logs, nil +} + +func (server *ServerMonitor) SetSemiSyncLeader() (string, error) { + logs := "" + if !server.IsSemiSyncMaster() { + logs, err := dbhelper.SetSemiSyncMaster(server.Conn, server.DBVersion) + if err != nil { + return logs, err + } + } + return logs, nil +} + func (server *ServerMonitor) SetReadOnly() (string, error) { logs := "" if !server.IsReadOnly() { diff --git a/config/config.go b/config/config.go index 6b25910a5..a8ed9ec50 100644 --- a/config/config.go +++ b/config/config.go @@ -133,6 +133,7 @@ type Config struct { PreScript string `mapstructure:"failover-pre-script" toml:"failover-pre-script" json:"failoverPreScript"` PostScript string `mapstructure:"failover-post-script" toml:"failover-post-script" json:"failoverPostScript"` ReadOnly bool `mapstructure:"failover-readonly-state" toml:"failover-readonly-state" json:"failoverReadOnlyState"` + FailoverSemiSyncState bool `mapstructure:"failover-semisync-state" toml:"failover-semisync-state" json:"failoverSemisyncState"` SuperReadOnly bool `mapstructure:"failover-superreadonly-state" toml:"failover-superreadonly-state" json:"failoverSuperReadOnlyState"` FailTime int64 `mapstructure:"failover-time-limit" toml:"failover-time-limit" json:"failoverTimeLimit"` FailSync bool `mapstructure:"failover-at-sync" toml:"failover-at-sync" json:"failoverAtSync"` @@ -462,6 +463,8 @@ type Config struct { BackupPhysicalCron string `mapstructure:"scheduler-db-servers-physical-backup-cron" toml:"scheduler-db-servers-physical-backup-cron" json:"schedulerDbServersPhysicalBackupCron"` BackupDatabaseLogCron string `mapstructure:"scheduler-db-servers-logs-cron" toml:"scheduler-db-servers-logs-cron" json:"schedulerDbServersLogsCron"` BackupDatabaseOptimizeCron string `mapstructure:"scheduler-db-servers-optimize-cron" toml:"scheduler-db-servers-optimize-cron" json:"schedulerDbServersOptimizeCron"` + BackupSaveScript string `mapstructure:"backup-save-script" toml:"backup-save-script" json:"backupSaveScript"` + BackupLoadScript string `mapstructure:"backup-load-script" toml:"backup-load-script" json:"backupLoadScript"` SchedulerDatabaseLogsTableRotate bool `mapstructure:"scheduler-db-servers-logs-table-rotate" toml:"scheduler-db-servers-logs-table-rotate" json:"schedulerDbServersLogsTableRotate"` SchedulerDatabaseLogsTableRotateCron string `mapstructure:"scheduler-db-servers-logs-table-rotate-cron" toml:"scheduler-db-servers-logs-table-rotate-cron" json:"schedulerDbServersLogsTableRotateCron"` SchedulerMaintenanceDatabaseLogsTableKeep int `mapstructure:"scheduler-db-servers-logs-table-keep" toml:"scheduler-db-servers-logs-table-keep" json:"schedulerDatabaseLogsTableKeep"` diff --git a/server/server_monitor.go b/server/server_monitor.go index aaf02199f..4ec96ed9d 100644 --- a/server/server_monitor.go +++ b/server/server_monitor.go @@ -140,6 +140,7 @@ func init() { monitorCmd.Flags().StringVar(&conf.PreScript, "failover-pre-script", "", "Path of pre-failover script") monitorCmd.Flags().StringVar(&conf.PostScript, "failover-post-script", "", "Path of post-failover script") monitorCmd.Flags().BoolVar(&conf.ReadOnly, "failover-readonly-state", true, "Failover Switchover set slaves as read-only") + monitorCmd.Flags().BoolVar(&conf.FailoverSemiSyncState, "failover-semisync-state", false, "Failover Switchover set semisync slave master state") monitorCmd.Flags().BoolVar(&conf.SuperReadOnly, "failover-superreadonly-state", false, "Failover Switchover set slaves as super-read-only") monitorCmd.Flags().StringVar(&conf.FailMode, "failover-mode", "manual", "Failover is manual or automatic") monitorCmd.Flags().Int64Var(&conf.FailMaxDelay, "failover-max-slave-delay", 30, "Election ignore slave with replication delay over this time in sec") @@ -366,6 +367,9 @@ func init() { monitorCmd.Flags().IntVar(&conf.BackupKeepMonthly, "backup-keep-monthly", 12, "Keep this number of monthly backup") monitorCmd.Flags().IntVar(&conf.BackupKeepYearly, "backup-keep-yearly", 2, "Keep this number of yearly backup") + monitorCmd.Flags().StringVar(&conf.BackupSaveScript, "backup-save-script", "", "Customized backup save script") + monitorCmd.Flags().StringVar(&conf.BackupLoadScript, "backup-load-script", "", "Customized backup load script") + monitorCmd.Flags().StringVar(&conf.BackupMyDumperPath, "backup-mydumper-path", "/usr/bin/mydumper", "Path to mydumper binary") monitorCmd.Flags().StringVar(&conf.BackupMyLoaderPath, "backup-myloader-path", "/usr/bin/myloader", "Path to myloader binary") monitorCmd.Flags().StringVar(&conf.BackupMyLoaderOptions, "backup-myloader-options", "--overwrite-tables --enable-binlog --verbose=3", "Extra options") diff --git a/utils/dbhelper/dbhelper.go b/utils/dbhelper/dbhelper.go index 865b7c5d9..307cd52f3 100644 --- a/utils/dbhelper/dbhelper.go +++ b/utils/dbhelper/dbhelper.go @@ -1957,6 +1957,42 @@ func SetMaxConnections(db *sqlx.DB, connections string, myver *MySQLVersion) (st return query, err } +func SetSemiSyncSlave(db *sqlx.DB, myver *MySQLVersion) (string, error) { + + query := "SET GLOBAL rpl-semi-sync-slave-enabled=1" + if myver.IsMySQL() && ((myver.Major >= 8 && myver.Minor > 0) || (myver.Major >= 8 && myver.Minor == 0 && myver.Release >= 26)) { + query = "SET GLOBAL rpl_semi_sync_replica_enabled=1" + } + _, err := db.Exec(query) + if err != nil { + return query, err + } + query = "SET GLOBAL rpl-semi-sync-master-enabled=0" + if myver.IsMySQL() && ((myver.Major >= 8 && myver.Minor > 0) || (myver.Major >= 8 && myver.Minor == 0 && myver.Release >= 26)) { + query = "SET GLOBAL rpl_semi_sync_source_enabled=0" + } + _, err = db.Exec(query) + return query, err +} + +func SetSemiSyncMaster(db *sqlx.DB, myver *MySQLVersion) (string, error) { + + query := "SET GLOBAL rpl-semi-sync-master-enabled=1" + if myver.IsMySQL() && ((myver.Major >= 8 && myver.Minor > 0) || (myver.Major >= 8 && myver.Minor == 0 && myver.Release >= 26)) { + query = "SET GLOBAL rpl_semi_sync_source_enabled=1" + } + _, err := db.Exec(query) + if err != nil { + return query, err + } + query = "SET GLOBAL rpl-semi-sync-slave-enabled=0" + if myver.IsMySQL() && ((myver.Major >= 8 && myver.Minor > 0) || (myver.Major >= 8 && myver.Minor == 0 && myver.Release >= 26)) { + query = "SET GLOBAL rpl_semi_sync_replica_enabled=0" + } + _, err = db.Exec(query) + return query, err +} + func SetSlaveGTIDModeStrict(db *sqlx.DB, myver *MySQLVersion) (string, error) { var err error stmt := ""