Skip to content

Commit

Permalink
Merge pull request #831 from signal18/backup
Browse files Browse the repository at this point in the history
PITR using local binary log and latest backup
  • Loading branch information
svaroqui authored Sep 4, 2024
2 parents dc41788 + f0dc196 commit d9ff6e3
Show file tree
Hide file tree
Showing 23 changed files with 1,191 additions and 232 deletions.
7 changes: 4 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ type Cluster struct {
Servers serverList `json:"-"`
LogSlaveServers []string `json:"-"` //To store slave with log-slave-updates
ServerIdList []string `json:"dbServers"`
Crashes crashList `json:"dbServersCrashes"`
Crashes crashList `json:"dbServersCrashes"` //This will be purged on all db node up
FailoverHistory crashList `json:"failoverHistory"` //This will be used for PITR
Proxies proxyList `json:"-"`
ProxyIdList []string `json:"proxyServers"`
FailoverCtr int `json:"failoverCounter"`
Expand Down Expand Up @@ -762,7 +763,7 @@ func (cluster *Cluster) StateProcessing() {
for _, srv := range cluster.Servers {
if srv.HasWaitLogicalBackupCookie() {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Server %s was waiting for logical backup", srv.URL)
go srv.JobReseedLogicalBackup()
go srv.JobReseedLogicalBackup("default")
}
}
}
Expand All @@ -772,7 +773,7 @@ func (cluster *Cluster) StateProcessing() {
if srv.HasWaitPhysicalBackupCookie() {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Server %s was waiting for physical backup", srv.URL)
go func() {
err := srv.JobReseedPhysicalBackup()
err := srv.JobReseedPhysicalBackup("default")
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, err.Error())
}
Expand Down
3 changes: 3 additions & 0 deletions cluster/cluster_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ func (cluster *Cluster) IsURLPassDatabasesACL(strUser string, URL string) bool {
if strings.Contains(URL, "/actions/reseed/") {
return true
}
if strings.Contains(URL, "/actions/pitr") {
return true
}
if strings.Contains(URL, "/actions/reseed-cancel") {
return true
}
Expand Down
10 changes: 10 additions & 0 deletions cluster/cluster_fail.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (cluster *Cluster) MasterFailover(fail bool) bool {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlDbg, "master_log_pos=%s", ms.ReadMasterLogPos.String)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlDbg, "Candidate semisync %t", cluster.master.SemiSyncSlaveStatus)
crash := new(Crash)
if fail == false {
crash.Switchover = true
}
crash.UnixTimestamp = time.Now().Unix()
crash.URL = cluster.oldMaster.URL
crash.ElectedMasterURL = cluster.master.URL
crash.FailoverMasterLogFile = ms.MasterLogFile.String
Expand Down Expand Up @@ -230,6 +234,7 @@ func (cluster *Cluster) MasterFailover(fail bool) bool {
}
}
cluster.Crashes = append(cluster.Crashes, crash)
cluster.FailoverHistory.StoreLastN(crash, cluster.Conf.FailoverLogFileKeep)
t := time.Now()
crash.Save(cluster.WorkingDir + "/failover." + t.Format("20060102150405") + ".json")
crash.Purge(cluster.WorkingDir, cluster.Conf.FailoverLogFileKeep)
Expand Down Expand Up @@ -1283,6 +1288,10 @@ func (cluster *Cluster) VMasterFailover(fail bool) bool {
}

crash := new(Crash)
if fail == false {
crash.Switchover = true
}
crash.UnixTimestamp = time.Now().Unix()
crash.URL = cluster.oldMaster.URL
crash.ElectedMasterURL = cluster.master.URL
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Save replication status before electing")
Expand Down Expand Up @@ -1313,6 +1322,7 @@ func (cluster *Cluster) VMasterFailover(fail bool) bool {
cluster.master.FailoverSemiSyncSlaveStatus = cluster.master.SemiSyncSlaveStatus
crash.FailoverSemiSyncSlaveStatus = cluster.master.SemiSyncSlaveStatus
cluster.Crashes = append(cluster.Crashes, crash)
cluster.FailoverHistory.StoreLastN(crash, cluster.Conf.FailoverLogFileKeep)
cluster.Save()
t := time.Now()
crash.Save(cluster.WorkingDir + "/failover." + t.Format("20060102150405") + ".json")
Expand Down
64 changes: 64 additions & 0 deletions cluster/cluster_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,37 @@ func (cluster *Cluster) SetBenchMethod(m string) {
cluster.benchmarkType = m
}

func (cluster *Cluster) AddPrefMaster(node *ServerMonitor) {
if cluster.IsInPreferedHosts(node) {
return
}

savedPrefMaster := cluster.GetPreferedMasterList()
if savedPrefMaster == "" {
cluster.SetPrefMaster(node.URL)
} else {
cluster.SetPrefMaster(savedPrefMaster + "," + node.URL)
}
}

func (cluster *Cluster) RemovePrefMaster(node *ServerMonitor) error {
if !cluster.IsInPreferedHosts(node) {
return fmt.Errorf("Host not found in prefered list")
}

savedPrefMaster := cluster.GetPreferedMasterList()
if savedPrefMaster == node.URL {
cluster.SetPrefMaster("")
} else {
//Remove the prefered from list
newPrefMaster := strings.Replace(savedPrefMaster, node.URL+",", "", -1)
newPrefMaster = strings.Replace(newPrefMaster, ","+node.URL, "", -1)
cluster.SetPrefMaster(newPrefMaster)
}

return nil
}

// SetPrefMaster is used by regtest test_switchover_semisync_switchback_prefmaster_norplcheck and API to force a server
func (cluster *Cluster) SetPrefMaster(PrefMasterURL string) {
var prefmasterlist []string
Expand All @@ -459,6 +490,39 @@ func (cluster *Cluster) SetPrefMaster(PrefMasterURL string) {
// fmt.Printf("Update config prefered Master: " + cluster.Conf.PrefMaster + "\n")
}

// Set Ignored Host for Election
func (cluster *Cluster) AddIgnoreSrv(node *ServerMonitor) {
if cluster.IsInIgnoredHosts(node) {
return
}

savedIgnoredHost := cluster.GetIgnoredHostList()
if savedIgnoredHost == "" {
cluster.SetIgnoreSrv(node.URL)
} else {
cluster.SetIgnoreSrv(savedIgnoredHost + "," + node.URL)
}
}

// Set Ignored Host for Election
func (cluster *Cluster) RemoveIgnoreSrv(node *ServerMonitor) error {
if !cluster.IsInIgnoredHosts(node) {
return fmt.Errorf("Host not found in ignored list")
}

savedIgnoredHost := cluster.GetIgnoredHostList()
if savedIgnoredHost == node.URL {
cluster.SetIgnoreSrv("")
} else {
//Remove the prefered from list
newIgnoredHost := strings.Replace(savedIgnoredHost, node.URL+",", "", -1)
newIgnoredHost = strings.Replace(newIgnoredHost, ","+node.URL, "", -1)
cluster.SetIgnoreSrv(newIgnoredHost)
}

return nil
}

// Set Ignored Host for Election
func (cluster *Cluster) SetIgnoreSrv(IgnoredHostURL string) {
var ignoresrvlist []string
Expand Down
28 changes: 28 additions & 0 deletions cluster/crash.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"encoding/json"
"os"
"strings"
"time"

"github.com/signal18/replication-manager/utils/gtid"
)
Expand All @@ -29,14 +30,41 @@ type Crash struct {
FailoverSemiSyncSlaveStatus bool
FailoverIOGtid *gtid.List
ElectedMasterURL string
UnixTimestamp int64
Switchover bool
}

// Collection of Crash reports
// swagger:response crashList
type crashList []*Crash

func (clist *crashList) Purge(keep int) {
if keep <= 0 {
*clist = nil
return
}
if len(*clist) > keep {
*clist = (*clist)[len(*clist)-keep:]
}
}

func (clist *crashList) StoreLastN(cr *Crash, keep int) {
*clist = append(*clist, cr)
clist.Purge(keep)
}

func (clist *crashList) GetLatest() *Crash {
size := len(*clist)
if size < 1 {
return nil
}

return (*clist)[size-1]
}

func (cluster *Cluster) newCrash(*Crash) (*Crash, error) {
crash := new(Crash)
crash.UnixTimestamp = time.Now().Unix()
return crash, nil
}

Expand Down
1 change: 1 addition & 0 deletions cluster/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type ServerMonitor struct {
IsRefreshingBinlogMeta bool
IsLoadingJobList bool
NeedRefreshJobs bool
PointInTimeMeta config.PointInTimeMeta
BinaryLogDir string
DBDataDir string
LastBackupMeta ServerBackupMeta `json:"lastBackupMeta"`
Expand Down
98 changes: 97 additions & 1 deletion cluster/srv_bck.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ package cluster

import (
"encoding/json"
"errors"
"fmt"
"os"
"time"

"github.com/signal18/replication-manager/config"
)
Expand Down Expand Up @@ -64,7 +67,9 @@ func (server *ServerMonitor) AppendLastMetadata(method string, latest *int64) {
*latest = meta.Id
}
} else {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlDbg, "Error reading %s meta: %s", method, err.Error())
if !errors.Is(err, os.ErrNotExist) {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlDbg, "Error reading %s meta: %s", method, err.Error())
}
}
}

Expand Down Expand Up @@ -129,3 +134,94 @@ func (server *ServerMonitor) GetLatestMeta(method string) (int64, *config.Backup

return latest, meta
}

func (server *ServerMonitor) ReseedPointInTime(meta config.PointInTimeMeta) error {
var err error
cluster := server.ClusterGroup

server.SetPointInTimeMeta(meta) //Set for PITR
defer server.SetPointInTimeMeta(config.PointInTimeMeta{}) //Reset after done

backup := cluster.BackupMetaMap.Get(meta.Backup)
if backup == nil {
return fmt.Errorf("Backup with id %d not found in BackupMetaMap", meta.Backup)
}

// Needed for updating status
if !cluster.Conf.MonitorScheduler {
return fmt.Errorf("PITR on node %s can not continue without monitoring scheduler", server.URL)
}

// Prevent reseed with incompatible tools
if server.IsMariaDB() && server.DBVersion.GreaterEqual("10.1") && backup.BackupTool == "xtrabackup" {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Node %s MariaDB version is greater than 10.1 and not compatible with xtrabackup. Cancelling reseed for data safety.", server.URL)
return fmt.Errorf("Node %s MariaDB version is greater than 10.1 and not compatible with xtrabackup.", server.URL)
}

if !meta.UseBinlog {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Requesting PITR on node %s with %s without using binary logs", server.URL, backup.BackupTool)
}

switch backup.BackupTool {
case config.ConstBackupLogicalTypeMysqldump, config.ConstBackupLogicalTypeMydumper, config.ConstBackupLogicalTypeRiver, config.ConstBackupLogicalTypeDumpling:
err = server.JobReseedLogicalBackup(backup.BackupTool)
case config.ConstBackupPhysicalTypeXtrabackup, config.ConstBackupPhysicalTypeMariaBackup:
err = server.JobReseedPhysicalBackup(backup.BackupTool)
default:
return fmt.Errorf("Wrong backup type for reseed: got %s", backup.BackupTool)
}
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "Error while trying to execute PITR on %s. err: %s", server.URL, err.Error())
return err
}

// Wait for 15 seconds until job result updated.
time.Sleep(time.Second * 15)

task := server.JobResults.Get("reseed" + backup.BackupTool)

//Wait until job result changed since we're using pointer
for task.State < 3 {
time.Sleep(time.Second)
}

//If failed
if task.State > 4 {
err = fmt.Errorf("Unable to complete reseed from backup using %s", backup.BackupTool)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "Error while trying to execute PITR on %s: %s", server.URL, err)
return err
}

if !meta.UseBinlog {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "PITR done on node %s without using binary logs", server.URL)
return nil
}

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Continue for injecting binary logs on %s until %s", server.URL, time.Unix(meta.RestoreTime, 0).Format(time.RFC3339))

source := cluster.GetServerFromURL(backup.Source)
if source == nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "Error while trying to execute PITR on %s. Unable to get backup source: %s", server.URL, backup.Source)
return fmt.Errorf("Source not found")
}

start := config.ReadBinaryLogsBoundary{Filename: backup.BinLogFileName, Position: int64(backup.BinLogFilePos)}
end := config.ReadBinaryLogsBoundary{UseTimestamp: true, Timestamp: time.Unix(meta.RestoreTime, 0)}
err = source.ReadAndExecBinaryLogsWithinRange(start, end, server)
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "Error while applying binlogs on %s. err: %s", server.URL, err.Error())
return err
}

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Binary logs injected on %s until %s", server.URL, time.Unix(meta.RestoreTime, 0).Format(time.RFC3339))

return nil
}

func (server *ServerMonitor) InjectViaBinlogs(meta config.PointInTimeMeta) error {
return nil
}

func (server *ServerMonitor) InjectViaReplication(meta config.PointInTimeMeta) error {
return nil
}
Loading

0 comments on commit d9ff6e3

Please sign in to comment.