Skip to content

Commit

Permalink
Merge pull request #666 from signal18/pitr
Browse files Browse the repository at this point in the history
Logical reseed and flashback fix for replication using source name
  • Loading branch information
svaroqui committed Jun 26, 2024
2 parents 3fe05a8 + 810487f commit b0efc71
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 50 deletions.
53 changes: 32 additions & 21 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,48 +717,59 @@ func (cluster *Cluster) StateProcessing() {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending master physical backup to reseed %s", s.ServerUrl)
if master != nil {
backupext := ".xbtream"
task := "reseed" + cluster.Conf.BackupPhysicalType

if cluster.Conf.CompressBackups {
backupext = backupext + ".gz"
}

if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed)
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed, task)
} else {
go cluster.SSTRunSender(master.GetMasterBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed)
go cluster.SSTRunSender(master.GetMasterBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed, task)
}
} else {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "No master cancel backup reseeding %s", s.ServerUrl)
}
}
if s.ErrKey == "WARN0075" {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending master logical backup to reseed %s", s.ServerUrl)
if master != nil {
if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed)
} else {
go cluster.SSTRunSender(master.GetMasterBackupDirectory()+"mysqldump.sql.gz", servertoreseed)
}
} else {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "No master cancel backup reseeding %s", s.ServerUrl)
}
/*
This action is inactive due to direct function from Job
*/
// //Only mysqldump exists in the script
// task := "reseed" + cluster.Conf.BackupLogicalType
// cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending master logical backup to reseed %s", s.ServerUrl)
// if master != nil {
// if mybcksrv != nil {
// go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task)
// } else {
// go cluster.SSTRunSender(master.GetMasterBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task)
// }
// } else {
// cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "No master cancel backup reseeding %s", s.ServerUrl)
// }
}
if s.ErrKey == "WARN0076" {
task := "flashback" + cluster.Conf.BackupPhysicalType
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending server physical backup to flashback reseed %s", s.ServerUrl)
if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed)
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed, task)
} else {
go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed)
go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed, task)
}
}
if s.ErrKey == "WARN0077" {

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending logical backup to flashback reseed %s", s.ServerUrl)
if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed)
} else {
go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed)
}
/*
This action is inactive due to direct function from rejoin
*/
// //Only mysqldump exists in the script
// task := "flashback" + cluster.Conf.BackupLogicalType
// cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending logical backup to flashback reseed %s", s.ServerUrl)
// if mybcksrv != nil {
// go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task)
// } else {
// go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task)
// }
}
if s.ErrKey == "WARN0101" {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Cluster have backup")
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func (cluster *Cluster) IsURLPassACL(strUser string, URL string) bool {
if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/settings/actions/discover") {
return true
}
if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/settings/actions/reset-failover-control") {
if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/actions/reset-failover-control") {
return true
}
}
Expand Down
17 changes: 12 additions & 5 deletions cluster/cluster_sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,13 @@ func (sst *SST) stream_copy_to_restic() <-chan int {
return sync_channel
}

func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) {
func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor, task string) {
port, _ := strconv.Atoi(sv.SSTPort)

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlWarn, "SST Reseed to port %s server %s", sv.SSTPort, sv.Host)

if cluster.Conf.SchedulerReceiverUseSSL {
cluster.SSTRunSenderSSL(backupfile, sv)
cluster.SSTRunSenderSSL(backupfile, sv, task)
return
}

Expand All @@ -457,6 +459,10 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) {
sendBuffer := make([]byte, cluster.Conf.SSTSendBuffer)
//fmt.Println("Start sending file!")
var total uint64

defer file.Close()
defer sv.RunTaskCallback(task)

for {
if strings.Contains(backupfile, "gz") {
fz, err := gzip.NewReader(file)
Expand All @@ -480,11 +486,9 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) {
}
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "Backup has been sent, closing connection!")

defer file.Close()

}

func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) {
func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor, task string) {
var (
client *tls.Conn
err error
Expand All @@ -505,6 +509,9 @@ func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) {
}
sendBuffer := make([]byte, 16384)
var total uint64

defer file.Close()
defer sv.RunTaskCallback(task)
for {
_, err = file.Read(sendBuffer)
if err == io.EOF {
Expand Down
9 changes: 8 additions & 1 deletion cluster/cluster_tgl.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,14 @@ func (cluster *Cluster) SwitchRejoinPhysicalBackup() {
func (cluster *Cluster) SwitchRejoinBackupBinlog() {
cluster.Conf.AutorejoinBackupBinlog = !cluster.Conf.AutorejoinBackupBinlog
}

func (cluster *Cluster) SwitchRejoinForceRestore() {
out := "N"
cluster.Conf.AutorejoinForceRestore = !cluster.Conf.AutorejoinForceRestore
if cluster.Conf.AutorejoinForceRestore {
out = "Y"
}
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Autorejoin active: %s", out)
}
func (cluster *Cluster) SwitchRejoinSemisync() {
cluster.Conf.AutorejoinSemisync = !cluster.Conf.AutorejoinSemisync
}
Expand Down
2 changes: 2 additions & 0 deletions cluster/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ type ServerMonitor struct {
IsInPFSQueryCapture bool
InPurgingBinaryLog bool
IsBackingUpBinaryLog bool
ActiveTasks sync.Map
BinaryLogDir string
DBDataDir string
sync.Mutex
}

Expand Down
15 changes: 12 additions & 3 deletions cluster/srv_cnf.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package cluster

import (
"strconv"
"strings"

"github.com/signal18/replication-manager/config"
"github.com/signal18/replication-manager/utils/dbhelper"
"github.com/signal18/replication-manager/utils/misc"
)

Expand Down Expand Up @@ -94,9 +96,16 @@ func (server *ServerMonitor) GetDatabaseDatadir() string {
} else if server.ClusterGroup.Conf.ProvOrchestrator == config.ConstOrchestratorSlapOS {
return server.SlapOSDatadir + "/var/lib/mysql"
} else if server.ClusterGroup.Conf.ProvOrchestrator == config.ConstOrchestratorOnPremise {
value := server.GetConfigVariable("DATADIR")
if value != "" {
return value
if server.DBDataDir == "" {
//Not using Variables[] due to uppercase values
if value, _, err := dbhelper.GetVariableByName(server.Conn, "DATADIR", server.DBVersion); err == nil {
value, _ := strings.CutSuffix(value, "/")

server.DBDataDir = value
return server.DBDataDir
}
} else {
return server.DBDataDir
}
}
return "/var/lib/mysql"
Expand Down
Loading

0 comments on commit b0efc71

Please sign in to comment.