From 6805bb4b1555708b679df570d0ef897568e4b591 Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Mon, 24 Jun 2024 13:06:23 +0700 Subject: [PATCH 01/10] Use correct replication channel after reseed --- cluster/srv_job.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cluster/srv_job.go b/cluster/srv_job.go index 2a344b5ea..de76c4c9e 100644 --- a/cluster/srv_job.go +++ b/cluster/srv_job.go @@ -169,6 +169,7 @@ func (server *ServerMonitor) JobReseedPhysicalBackup() (int64, error) { Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime), Mode: "SLAVE_POS", SSL: cluster.Conf.ReplicationSSL, + Channel: cluster.Conf.MasterConn, }, server.DBVersion) cluster.LogSQL(logs, err, server.URL, "Rejoin", config.LvlErr, "Reseed can't changing master for physical backup %s request for server: %s %s", cluster.Conf.BackupPhysicalType, server.URL, err) From 2b0e99f3322fe6c0860ae2afec76b8f329a7788d Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Mon, 24 Jun 2024 15:16:12 +0700 Subject: [PATCH 02/10] Fix reseed logical backup with source replication --- cluster/srv_job.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cluster/srv_job.go b/cluster/srv_job.go index de76c4c9e..37417d40f 100644 --- a/cluster/srv_job.go +++ b/cluster/srv_job.go @@ -246,6 +246,7 @@ func (server *ServerMonitor) JobReseedLogicalBackup() (int64, error) { Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime), Mode: "SLAVE_POS", SSL: cluster.Conf.ReplicationSSL, + Channel: cluster.Conf.MasterConn, }, server.DBVersion) cluster.LogSQL(logs, err, server.URL, "Rejoin", config.LvlErr, "Reseed can't changing master for logical backup %s request for server: %s %s", cluster.Conf.BackupPhysicalType, server.URL, err) if err != nil { From 37a956cd49b830892c99b9c3b8874b29d0bbdc6a Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Tue, 25 Jun 2024 10:29:59 +0700 Subject: [PATCH 03/10] add channel for change master --- cluster/srv_rejoin.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cluster/srv_rejoin.go b/cluster/srv_rejoin.go index 64e05fa3d..b610063e5 100644 --- a/cluster/srv_rejoin.go +++ b/cluster/srv_rejoin.go @@ -364,6 +364,7 @@ func (server *ServerMonitor) RejoinDirectDump() error { Logfile: realmaster.FailoverMasterLogFile, Logpos: realmaster.FailoverMasterLogPos, SSL: cluster.Conf.ReplicationSSL, + Channel: cluster.Conf.MasterConn, }, server.DBVersion) cluster.LogSQL(logs, err3, server.URL, "Rejoin", config.LvlErr, "Failed change master maxscale on %s: %s", server.URL, err3) } From 56c234d58500a3fe59e32360d74a741c052be46f Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Tue, 25 Jun 2024 19:54:39 +0700 Subject: [PATCH 04/10] reseed-mysqldump --- cluster/cluster.go | 39 ++++++++------- cluster/cluster_sst.go | 17 +++++-- cluster/srv.go | 2 + cluster/srv_cnf.go | 15 ++++-- cluster/srv_job.go | 107 +++++++++++++++++++++++++++++++++++++---- config/config.go | 2 +- 6 files changed, 146 insertions(+), 36 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index bb13e2c3d..bdba93334 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -717,47 +717,52 @@ func (cluster *Cluster) StateProcessing() { cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending master physical backup to reseed %s", s.ServerUrl) if master != nil { backupext := ".xbtream" + task := "reseed" + cluster.Conf.BackupPhysicalType if cluster.Conf.CompressBackups { backupext = backupext + ".gz" } if mybcksrv != nil { - go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed) + go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed, task) } else { - go cluster.SSTRunSender(master.GetMasterBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed) + go cluster.SSTRunSender(master.GetMasterBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, servertoreseed, task) } } else { cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "No master cancel backup reseeding %s", s.ServerUrl) } } if s.ErrKey == "WARN0075" { - cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending master logical backup to reseed %s", s.ServerUrl) - if master != nil { - if mybcksrv != nil { - go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed) - } else { - go cluster.SSTRunSender(master.GetMasterBackupDirectory()+"mysqldump.sql.gz", servertoreseed) - } - } else { - cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "No master cancel backup reseeding %s", s.ServerUrl) - } + // //Only mysqldump exists in the script + // task := "reseed" + cluster.Conf.BackupLogicalType + // cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending master logical backup to reseed %s", s.ServerUrl) + // if master != nil { + // if mybcksrv != nil { + // go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task) + // } else { + // go cluster.SSTRunSender(master.GetMasterBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task) + // } + // } else { + // cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "No master cancel backup reseeding %s", s.ServerUrl) + // } } if s.ErrKey == "WARN0076" { + task := "flashback" + cluster.Conf.BackupPhysicalType cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending server physical backup to flashback reseed %s", s.ServerUrl) if mybcksrv != nil { - go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed) + go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed, task) } else { - go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed) + go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", servertoreseed, task) } } if s.ErrKey == "WARN0077" { - + //Only mysqldump exists in the script + task := "flashback" + cluster.Conf.BackupLogicalType cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending logical backup to flashback reseed %s", s.ServerUrl) if mybcksrv != nil { - go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed) + go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task) } else { - go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed) + go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task) } } if s.ErrKey == "WARN0101" { diff --git a/cluster/cluster_sst.go b/cluster/cluster_sst.go index a96d2825d..b229be536 100644 --- a/cluster/cluster_sst.go +++ b/cluster/cluster_sst.go @@ -428,11 +428,13 @@ func (sst *SST) stream_copy_to_restic() <-chan int { return sync_channel } -func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) { +func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor, task string) { port, _ := strconv.Atoi(sv.SSTPort) + cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlWarn, "SST Reseed to port %s server %s", sv.SSTPort, sv.Host) + if cluster.Conf.SchedulerReceiverUseSSL { - cluster.SSTRunSenderSSL(backupfile, sv) + cluster.SSTRunSenderSSL(backupfile, sv, task) return } @@ -457,6 +459,10 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) { sendBuffer := make([]byte, cluster.Conf.SSTSendBuffer) //fmt.Println("Start sending file!") var total uint64 + + defer file.Close() + defer sv.RunTaskCallback(task) + for { if strings.Contains(backupfile, "gz") { fz, err := gzip.NewReader(file) @@ -480,11 +486,9 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) { } cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "Backup has been sent, closing connection!") - defer file.Close() - } -func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) { +func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor, task string) { var ( client *tls.Conn err error @@ -505,6 +509,9 @@ func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) { } sendBuffer := make([]byte, 16384) var total uint64 + + defer file.Close() + defer sv.RunTaskCallback(task) for { _, err = file.Read(sendBuffer) if err == io.EOF { diff --git a/cluster/srv.go b/cluster/srv.go index 91b573c62..4adfde585 100644 --- a/cluster/srv.go +++ b/cluster/srv.go @@ -189,7 +189,9 @@ type ServerMonitor struct { IsInPFSQueryCapture bool InPurgingBinaryLog bool IsBackingUpBinaryLog bool + ActiveTasks sync.Map BinaryLogDir string + DBDataDir string sync.Mutex } diff --git a/cluster/srv_cnf.go b/cluster/srv_cnf.go index 2244b55ce..eecff7d36 100644 --- a/cluster/srv_cnf.go +++ b/cluster/srv_cnf.go @@ -8,8 +8,10 @@ package cluster import ( "strconv" + "strings" "github.com/signal18/replication-manager/config" + "github.com/signal18/replication-manager/utils/dbhelper" "github.com/signal18/replication-manager/utils/misc" ) @@ -94,9 +96,16 @@ func (server *ServerMonitor) GetDatabaseDatadir() string { } else if server.ClusterGroup.Conf.ProvOrchestrator == config.ConstOrchestratorSlapOS { return server.SlapOSDatadir + "/var/lib/mysql" } else if server.ClusterGroup.Conf.ProvOrchestrator == config.ConstOrchestratorOnPremise { - value := server.GetConfigVariable("DATADIR") - if value != "" { - return value + if server.DBDataDir == "" { + //Not using Variables[] due to uppercase values + if value, _, err := dbhelper.GetVariableByName(server.Conn, "DATADIR", server.DBVersion); err == nil { + value, _ := strings.CutSuffix(value, "/") + + server.DBDataDir = value + return server.DBDataDir + } + } else { + return server.DBDataDir } } return "/var/lib/mysql" diff --git a/cluster/srv_job.go b/cluster/srv_job.go index 5dbb3f97a..66731d6e9 100644 --- a/cluster/srv_job.go +++ b/cluster/srv_job.go @@ -221,18 +221,27 @@ func (server *ServerMonitor) JobFlashbackPhysicalBackup() (int64, error) { func (server *ServerMonitor) JobReseedLogicalBackup() (int64, error) { cluster := server.ClusterGroup + task := "reseed" + cluster.Conf.BackupLogicalType + var dt DBTask = DBTask{task: task} if cluster.master != nil && !cluster.GetBackupServer().HasBackupLogicalCookie() { server.createCookie("cookie_waitbackup") return 0, errors.New("No Logical Backup") } - jobid, err := server.JobInsertTaks("reseed"+cluster.Conf.BackupLogicalType, server.SSTPort, cluster.Conf.MonitorAddress) + if v, ok := server.ActiveTasks.Load(task); ok { + dt = v.(DBTask) + } + jobid, err := server.JobInsertTaks(task, server.SSTPort, cluster.Conf.MonitorAddress) if err != nil { cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Receive reseed logical backup %s request for server: %s %s", cluster.Conf.BackupLogicalType, server.URL, err) - return jobid, err + } else { + dt.ct++ + dt.id = jobid + server.ActiveTasks.Store(task, dt) } + logs, err := server.StopSlave() cluster.LogSQL(logs, err, server.URL, "Rejoin", config.LvlErr, "Failed stop slave on server: %s %s", server.URL, err) @@ -253,7 +262,9 @@ func (server *ServerMonitor) JobReseedLogicalBackup() (int64, error) { return jobid, err } cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Receive reseed logical backup %s request for server: %s", cluster.Conf.BackupLogicalType, server.URL) - if cluster.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper { + if cluster.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMysqldump { + go server.JobReseedMysqldump() + } else if cluster.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper { go server.JobReseedMyLoader() } return jobid, err @@ -458,6 +469,26 @@ func (server *ServerMonitor) JobReseedMyLoader() { } +func (server *ServerMonitor) JobReseedMysqldump() { + cluster := server.ClusterGroup + mybcksrv := cluster.GetBackupServer() + master := cluster.GetMaster() + go server.JobRunViaSSH() + + //Only mysqldump exists in the script + task := "reseed" + cluster.Conf.BackupLogicalType + cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending logical backup to reseed %s", server.URL) + if master != nil { + if mybcksrv != nil { + go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", server, task) + } else { + go cluster.SSTRunSender(master.GetMasterBackupDirectory()+"mysqldump.sql.gz", server, task) + } + } else { + cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "No master. Cancel backup reseeding %s", server.URL) + } +} + func (server *ServerMonitor) JobReseedBackupScript() { cluster := server.ClusterGroup cmd := exec.Command(cluster.Conf.BackupLoadScript, misc.Unbracket(server.Host), misc.Unbracket(cluster.master.Host)) @@ -543,17 +574,19 @@ func (server *ServerMonitor) JobMyLoaderParseMeta(dir string) (config.MyDumperMe return m, nil } +type DBTask struct { + task string + ct int + id int64 +} + func (server *ServerMonitor) JobsCheckRunning() error { cluster := server.ClusterGroup if server.IsDown() { return nil } //server.JobInsertTaks("", "", "") - type DBTask struct { - task string - ct int - } - rows, err := server.Conn.Queryx("SELECT task ,count(*) as ct FROM replication_manager_schema.jobs WHERE done=0 AND result IS NULL group by task ") + rows, err := server.Conn.Queryx("SELECT task ,count(*) as ct, max(id) as id FROM replication_manager_schema.jobs WHERE done=0 AND result IS NULL group by task ") if err != nil { cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Scheduler error fetching replication_manager_schema.jobs %s", err) server.JobsCreateTable() @@ -562,7 +595,7 @@ func (server *ServerMonitor) JobsCheckRunning() error { defer rows.Close() for rows.Next() { var task DBTask - rows.Scan(&task.task, &task.ct) + rows.Scan(&task.task, &task.ct, &task.id) if task.ct > 0 { if task.ct > 10 { cluster.StateMachine.AddState("ERR00060", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(cluster.GetErrorList()["ERR00060"], server.URL), ErrFrom: "JOB", ServerUrl: server.URL}) @@ -598,8 +631,11 @@ func (server *ServerMonitor) JobsCheckRunning() error { cluster.StateMachine.AddState("WARN0077", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(cluster.GetErrorList()["WARN0077"], cluster.Conf.BackupLogicalType, server.URL), ErrFrom: "JOB", ServerUrl: server.URL}) } else if task.task == "flashbackmysqldump" { cluster.StateMachine.AddState("WARN0077", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(cluster.GetErrorList()["WARN0077"], cluster.Conf.BackupLogicalType, server.URL), ErrFrom: "JOB", ServerUrl: server.URL}) + } else { + //Skip adding to active task if not defined + continue } - + server.ActiveTasks.Store(task.task, task) } } @@ -1379,3 +1415,54 @@ func (server *ServerMonitor) InitiateJobBackupBinlog(binlogfile string, isPurge return errors.New("Wrong configuration for Backup Binlog Method!") } + +func (server *ServerMonitor) RunTaskCallback(task string) error { + cluster := server.ClusterGroup + var err error + + if v, ok := server.ActiveTasks.Load(task); ok { + dt := v.(DBTask) + switch dt.task { + case "reseedmysqldump", "flashbackmysqldump": + go server.CallbackMysqldump(dt) + } + } else { + err = errors.New("No active task found!") + cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Error executing callback, %s", err.Error()) + } + + return err +} + +func (server *ServerMonitor) CallbackMysqldump(dt DBTask) error { + cluster := server.ClusterGroup + var err error + + rows, err := server.Conn.Queryx("SELECT done FROM replication_manager_schema.jobs WHERE id=?", dt.id) + if err != nil { + cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Scheduler error fetching replication_manager_schema.jobs %s", err) + server.JobsCreateTable() + return err + } + + var done int + var count int = 0 + defer rows.Close() + for rows.Next() { + rows.Scan(&done) + count++ + } + + //Check if id exists + if count > 0 { + if done == 1 { + server.StartSlave() + return nil + } else { + time.Sleep(time.Second * time.Duration(cluster.Conf.MonitoringTicker)) + return server.CallbackMysqldump(dt) + } + } + + return err +} diff --git a/config/config.go b/config/config.go index d77d55e31..a06644b85 100644 --- a/config/config.go +++ b/config/config.go @@ -1865,7 +1865,7 @@ func (conf *Config) IsEligibleForPrinting(module int, level string) bool { break case module == ConstLogModHeartBeat: if conf.LogHeartbeat { - return conf.LogSSTLevel >= lvl + return conf.LogHeartbeatLevel >= lvl } break case module == ConstLogModConfigLoad: From 3770c3f46e966dad9d6e0c57f1f44e304b520f4b Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Wed, 26 Jun 2024 11:06:21 +0700 Subject: [PATCH 05/10] use more recent mydumper and move installation to top for better cache --- docker/Dockerfile.dev | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/docker/Dockerfile.dev b/docker/Dockerfile.dev index c7a6eed14..8b3718676 100644 --- a/docker/Dockerfile.dev +++ b/docker/Dockerfile.dev @@ -1,10 +1,24 @@ FROM golang:1.23rc1-bullseye ENV PROXYSQL_VERSION=2.5.5 +ENV MYDUMPER_VERSION=0.16.3-3 RUN mkdir -p /go/src/github.com/signal18/replication-manager WORKDIR /go/src/github.com/signal18/replication-manager +# Move to top for better cache +RUN apt-get update && apt-get -y install ca-certificates restic mariadb-client mariadb-server mariadb-plugin-spider haproxy libmariadb-dev fuse sysbench curl vim libatomic1 +RUN curl -LO https://github.com/sysown/proxysql/releases/download/v$PROXYSQL_VERSION/proxysql_$PROXYSQL_VERSION-debian11_amd64.deb && dpkg -i proxysql_$PROXYSQL_VERSION-debian11_amd64.deb +RUN curl -LO https://github.com/mydumper/mydumper/releases/download/v$MYDUMPER_VERSION/mydumper_$MYDUMPER_VERSION.bullseye_amd64.deb && dpkg -i mydumper_$MYDUMPER_VERSION.bullseye_amd64.deb +RUN /usr/local/go/bin/go install -v golang.org/x/tools/gopls@latest +RUN /usr/local/go/bin/go install -v github.com/cweill/gotests/gotests@v1.6.0 +RUN /usr/local/go/bin/go install -v github.com/fatih/gomodifytags@v1.16.0 +RUN /usr/local/go/bin/go install -v github.com/josharian/impl@v1.1.0 +RUN /usr/local/go/bin/go install -v github.com/haya14busa/goplay/cmd/goplay@v1.0.0 +RUN /usr/local/go/bin/go install -v github.com/haya14busa/goplay/cmd/goplay@v1.0.0 +RUN /usr/local/go/bin/go install -v github.com/go-delve/delve/cmd/dlv@latest +RUN /usr/local/go/bin/go install -v honnef.co/go/tools/cmd/staticcheck@latest + COPY . . RUN make pro cli @@ -22,15 +36,5 @@ cp -r share /usr/share/replication-manager/ && \ cp build/binaries/replication-manager-pro /usr/bin/replication-manager && \ cp build/binaries/replication-manager-cli /usr/bin/replication-manager-cli -RUN apt-get update && apt-get -y install mydumper ca-certificates restic mariadb-client mariadb-server mariadb-plugin-spider haproxy libmariadb-dev fuse sysbench curl vim -RUN curl -LO https://github.com/sysown/proxysql/releases/download/v$PROXYSQL_VERSION/proxysql_$PROXYSQL_VERSION-debian11_amd64.deb && dpkg -i proxysql_$PROXYSQL_VERSION-debian11_amd64.deb -RUN /usr/local/go/bin/go install -v golang.org/x/tools/gopls@latest -RUN /usr/local/go/bin/go install -v github.com/cweill/gotests/gotests@v1.6.0 -RUN /usr/local/go/bin/go install -v github.com/fatih/gomodifytags@v1.16.0 -RUN /usr/local/go/bin/go install -v github.com/josharian/impl@v1.1.0 -RUN /usr/local/go/bin/go install -v github.com/haya14busa/goplay/cmd/goplay@v1.0.0 -RUN /usr/local/go/bin/go install -v github.com/haya14busa/goplay/cmd/goplay@v1.0.0 -RUN /usr/local/go/bin/go install -v github.com/go-delve/delve/cmd/dlv@latest -RUN /usr/local/go/bin/go install -v honnef.co/go/tools/cmd/staticcheck@latest CMD ["while true ; do sleep 1; done"] EXPOSE 10001 From f98e3fd1fbf2629bf75145136418de2ef78fe9f9 Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Wed, 26 Jun 2024 12:46:20 +0700 Subject: [PATCH 06/10] Use correct options for mydumper --- cluster/srv_job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster/srv_job.go b/cluster/srv_job.go index 66731d6e9..dbaaa3511 100644 --- a/cluster/srv_job.go +++ b/cluster/srv_job.go @@ -882,7 +882,7 @@ func (server *ServerMonitor) JobBackupLogical() error { // --no-schemas --regex '^(?!(mysql))' threads := strconv.Itoa(cluster.Conf.BackupLogicalDumpThreads) - myargs := strings.Split(strings.ReplaceAll(cluster.Conf.BackupMyLoaderOptions, " ", " "), " ") + myargs := strings.Split(strings.ReplaceAll(cluster.Conf.BackupMyDumperOptions, " ", " "), " ") myargs = append(myargs, "--outputdir="+server.GetMyBackupDirectory(), "--threads="+threads, "--host="+misc.Unbracket(server.Host), "--port="+server.Port, "--user="+cluster.GetDbUser(), "--password="+cluster.GetDbPass()) dumpCmd := exec.Command(cluster.GetMyDumperPath(), myargs...) From c3f7ca54e423d0038aa189b5e48798f493f6479c Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Wed, 26 Jun 2024 14:36:51 +0700 Subject: [PATCH 07/10] fix reseed with myloader --- cluster/cluster_tgl.go | 9 ++++++++- cluster/srv_job.go | 4 ++++ server/api_cluster.go | 2 ++ server/server.go | 2 +- share/dashboard/static/card-setting-rejoin.html | 14 ++++++++++++++ 5 files changed, 29 insertions(+), 2 deletions(-) diff --git a/cluster/cluster_tgl.go b/cluster/cluster_tgl.go index a6b52a8da..824820591 100644 --- a/cluster/cluster_tgl.go +++ b/cluster/cluster_tgl.go @@ -204,7 +204,14 @@ func (cluster *Cluster) SwitchRejoinPhysicalBackup() { func (cluster *Cluster) SwitchRejoinBackupBinlog() { cluster.Conf.AutorejoinBackupBinlog = !cluster.Conf.AutorejoinBackupBinlog } - +func (cluster *Cluster) SwitchRejoinForceRestore() { + out := "N" + cluster.Conf.AutorejoinForceRestore = !cluster.Conf.AutorejoinForceRestore + if cluster.Conf.AutorejoinForceRestore { + out = "Y" + } + cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Autorejoin active: %s", out) +} func (cluster *Cluster) SwitchRejoinSemisync() { cluster.Conf.AutorejoinSemisync = !cluster.Conf.AutorejoinSemisync } diff --git a/cluster/srv_job.go b/cluster/srv_job.go index dbaaa3511..8283cce64 100644 --- a/cluster/srv_job.go +++ b/cluster/srv_job.go @@ -314,6 +314,7 @@ func (server *ServerMonitor) JobFlashbackLogicalBackup() (int64, error) { Heartbeat: strconv.Itoa(cluster.Conf.ForceSlaveHeartbeatTime), Mode: "SLAVE_POS", SSL: cluster.Conf.ReplicationSSL, + Channel: cluster.Conf.MasterConn, }, server.DBVersion) cluster.LogSQL(logs, err, server.URL, "Rejoin", config.LvlErr, "Reseed can't changing master for logical backup %s request for server: %s %s", cluster.Conf.BackupPhysicalType, server.URL, err) if err != nil { @@ -429,6 +430,9 @@ func (server *ServerMonitor) JobReseedMyLoader() { threads := strconv.Itoa(cluster.Conf.BackupLogicalLoadThreads) myargs := strings.Split(strings.ReplaceAll(cluster.Conf.BackupMyLoaderOptions, " ", " "), " ") + if server.URL == cluster.GetMaster().URL { + myargs = append(myargs, "--enable-binlog") + } myargs = append(myargs, "--directory="+cluster.master.GetMasterBackupDirectory(), "--threads="+threads, "--host="+misc.Unbracket(server.Host), "--port="+server.Port, "--user="+cluster.GetDbUser(), "--password="+cluster.GetDbPass()) dumpCmd := exec.Command(cluster.GetMyLoaderPath(), myargs...) diff --git a/server/api_cluster.go b/server/api_cluster.go index ae7fcd73c..3341e1d45 100644 --- a/server/api_cluster.go +++ b/server/api_cluster.go @@ -1050,6 +1050,8 @@ func (repman *ReplicationManager) switchSettings(mycluster *cluster.Cluster, set mycluster.SwitchRejoinLogicalBackup() case "autorejoin-physical-backup": mycluster.SwitchRejoinPhysicalBackup() + case "autorejoin-force-restore": + mycluster.SwitchRejoinForceRestore() case "switchover-at-sync": mycluster.SwitchSwitchoverSync() case "check-replication-filters": diff --git a/server/server.go b/server/server.go index 160c6b330..485a4703f 100644 --- a/server/server.go +++ b/server/server.go @@ -651,7 +651,7 @@ func (repman *ReplicationManager) AddFlags(flags *pflag.FlagSet, conf *config.Co flags.StringVar(&conf.BackupMyDumperPath, "backup-mydumper-path", "/usr/bin/mydumper", "Path to mydumper binary") flags.StringVar(&conf.BackupMyLoaderPath, "backup-myloader-path", "/usr/bin/myloader", "Path to myloader binary") - flags.StringVar(&conf.BackupMyLoaderOptions, "backup-myloader-options", "--overwrite-tables --enable-binlog --verbose=3", "Extra options") + flags.StringVar(&conf.BackupMyLoaderOptions, "backup-myloader-options", "--overwrite-tables --verbose=3", "Extra options") flags.StringVar(&conf.BackupMyDumperOptions, "backup-mydumper-options", "--chunk-filesize=1000 --compress --less-locking --verbose=3 --triggers --routines --events --trx-consistency-only --kill-long-queries", "Extra options") flags.StringVar(&conf.BackupMysqldumpPath, "backup-mysqldump-path", "", "Path to mysqldump binary") flags.StringVar(&conf.BackupMysqldumpOptions, "backup-mysqldump-options", "--hex-blob --single-transaction --verbose --all-databases --routines=true --triggers=true --system=all", "Extra options") diff --git a/share/dashboard/static/card-setting-rejoin.html b/share/dashboard/static/card-setting-rejoin.html index ee790dc30..c41c77d01 100644 --- a/share/dashboard/static/card-setting-rejoin.html +++ b/share/dashboard/static/card-setting-rejoin.html @@ -105,6 +105,20 @@ + + Force Rejoin With Restore + + + + + OnOff + + + + Auto seed from backup standalone server From 4024f09a6e368388ad6534cfb8ee8e8535adc519 Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Wed, 26 Jun 2024 17:52:16 +0700 Subject: [PATCH 08/10] logical flashback and failover limit --- cluster/cluster_acl.go | 2 +- cluster/srv_job.go | 26 ++++++++++++------- share/dashboard/app/dashboard.js | 5 +++- .../static/card-setting-replication.html | 11 ++++++++ 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/cluster/cluster_acl.go b/cluster/cluster_acl.go index 9d544659c..a33e488a1 100644 --- a/cluster/cluster_acl.go +++ b/cluster/cluster_acl.go @@ -579,7 +579,7 @@ func (cluster *Cluster) IsURLPassACL(strUser string, URL string) bool { if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/settings/actions/discover") { return true } - if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/settings/actions/reset-failover-control") { + if strings.Contains(URL, "/api/clusters/"+cluster.Name+"/actions/reset-failover-control") { return true } } diff --git a/cluster/srv_job.go b/cluster/srv_job.go index 8283cce64..919a32b77 100644 --- a/cluster/srv_job.go +++ b/cluster/srv_job.go @@ -263,7 +263,7 @@ func (server *ServerMonitor) JobReseedLogicalBackup() (int64, error) { } cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Receive reseed logical backup %s request for server: %s", cluster.Conf.BackupLogicalType, server.URL) if cluster.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMysqldump { - go server.JobReseedMysqldump() + go server.JobReseedMysqldump(task) } else if cluster.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper { go server.JobReseedMyLoader() } @@ -292,15 +292,22 @@ func (server *ServerMonitor) JobServerRestart() (int64, error) { func (server *ServerMonitor) JobFlashbackLogicalBackup() (int64, error) { cluster := server.ClusterGroup + task := "flashback" + cluster.Conf.BackupLogicalType + var dt DBTask = DBTask{task: task} + var err error if cluster.master != nil && !cluster.GetBackupServer().HasBackupLogicalCookie() { server.createCookie("cookie_waitbackup") return 0, errors.New("No Logical Backup") } - jobid, err := server.JobInsertTaks("flashback"+cluster.Conf.BackupLogicalType, server.SSTPort, cluster.Conf.MonitorAddress) + var jobid int64 + jobid, err = server.JobInsertTaks(task, server.SSTPort, cluster.Conf.MonitorAddress) if err != nil { - cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Receive reseed logical backup %s request for server: %s %s", cluster.Conf.BackupPhysicalType, server.URL, err) - + cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Receive flashback logical backup %s request for server: %s %s", cluster.Conf.BackupLogicalType, server.URL, err) return jobid, err + } else { + dt.ct++ + dt.id = jobid + server.ActiveTasks.Store(task, dt) } logs, err := server.StopSlave() cluster.LogSQL(logs, err, server.URL, "Rejoin", config.LvlErr, "Failed stop slave on server: %s %s", server.URL, err) @@ -316,14 +323,17 @@ func (server *ServerMonitor) JobFlashbackLogicalBackup() (int64, error) { SSL: cluster.Conf.ReplicationSSL, Channel: cluster.Conf.MasterConn, }, server.DBVersion) - cluster.LogSQL(logs, err, server.URL, "Rejoin", config.LvlErr, "Reseed can't changing master for logical backup %s request for server: %s %s", cluster.Conf.BackupPhysicalType, server.URL, err) + cluster.LogSQL(logs, err, server.URL, "Rejoin", config.LvlErr, "flashback can't changing master for logical backup %s request for server: %s %s", cluster.Conf.BackupLogicalType, server.URL, err) if err != nil { return jobid, err } - cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Receive reseed logical backup %s request for server: %s", cluster.Conf.BackupPhysicalType, server.URL) + cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Receive flashback logical backup %s request for server: %s", cluster.Conf.BackupLogicalType, server.URL) if cluster.Conf.BackupLoadScript != "" { + cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Using script from backup-load-script on %s", server.URL) go server.JobReseedBackupScript() + } else if cluster.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMysqldump { + go server.JobReseedMysqldump(task) } else if cluster.Conf.BackupLogicalType == config.ConstBackupLogicalTypeMydumper { go server.JobReseedMyLoader() } @@ -473,14 +483,12 @@ func (server *ServerMonitor) JobReseedMyLoader() { } -func (server *ServerMonitor) JobReseedMysqldump() { +func (server *ServerMonitor) JobReseedMysqldump(task string) { cluster := server.ClusterGroup mybcksrv := cluster.GetBackupServer() master := cluster.GetMaster() go server.JobRunViaSSH() - //Only mysqldump exists in the script - task := "reseed" + cluster.Conf.BackupLogicalType cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending logical backup to reseed %s", server.URL) if master != nil { if mybcksrv != nil { diff --git a/share/dashboard/app/dashboard.js b/share/dashboard/app/dashboard.js index 132678c0d..0c091c1c5 100644 --- a/share/dashboard/app/dashboard.js +++ b/share/dashboard/app/dashboard.js @@ -1185,7 +1185,7 @@ app.controller('DashboardController', function ( if (confirm("Confirm database apply config")) httpGetWithoutResponse(getClusterUrl() + '/settings/actions/apply-dynamic-config'); }; $scope.resetfail = function () { - if (confirm("Reset Failover counter?")) httpGetWithoutResponse(getClusterUrl() + '/actions/reset-failover-counter'); + if (confirm("Reset Failover counter?")) httpGetWithoutResponse(getClusterUrl() + '/actions/reset-failover-control'); }; $scope.resetsla = function () { if (confirm("Reset SLA counters?")) httpGetWithoutResponse(getClusterUrl() + '/actions/reset-sla'); @@ -1700,6 +1700,9 @@ app.controller('DashboardController', function ( $scope.changemaxdelay = function (delay) { if (confirm("Confirm change delay " + delay.toString())) httpGetWithoutResponse(getClusterUrl() + '/settings/actions/set/failover-max-slave-delay/' + delay); }; + $scope.changefailoverlimit = function (limit) { + if (confirm("Confirm change failover-limit " + limit.toString())) httpGetWithoutResponse(getClusterUrl() + '/settings/actions/set/failover-limit/' + limit); + }; $scope.changebackupbinlogskeep = function (delay) { if (confirm("Confirm change keep binlogs files " + delay.toString())) httpGetWithoutResponse(getClusterUrl() + '/settings/actions/set/backup-binlogs-keep/' + delay); }; diff --git a/share/dashboard/static/card-setting-replication.html b/share/dashboard/static/card-setting-replication.html index b9a46f99f..e2ce434ae 100644 --- a/share/dashboard/static/card-setting-replication.html +++ b/share/dashboard/static/card-setting-replication.html @@ -1,5 +1,16 @@ + + + + + + From 37835f751423270db4de9c3a9add3028c9897b50 Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Wed, 26 Jun 2024 18:03:55 +0700 Subject: [PATCH 09/10] remove indirect action for flashback --- cluster/cluster.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index ada5a2f95..c4ada5b7e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -733,6 +733,9 @@ func (cluster *Cluster) StateProcessing() { } } if s.ErrKey == "WARN0075" { + /* + This action is inactive due to direct function from Job + */ // //Only mysqldump exists in the script // task := "reseed" + cluster.Conf.BackupLogicalType // cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending master logical backup to reseed %s", s.ServerUrl) @@ -756,14 +759,17 @@ func (cluster *Cluster) StateProcessing() { } } if s.ErrKey == "WARN0077" { - //Only mysqldump exists in the script - task := "flashback" + cluster.Conf.BackupLogicalType - cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending logical backup to flashback reseed %s", s.ServerUrl) - if mybcksrv != nil { - go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task) - } else { - go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task) - } + /* + This action is inactive due to direct function from rejoin + */ + // //Only mysqldump exists in the script + // task := "flashback" + cluster.Conf.BackupLogicalType + // cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending logical backup to flashback reseed %s", s.ServerUrl) + // if mybcksrv != nil { + // go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task) + // } else { + // go cluster.SSTRunSender(servertoreseed.GetMyBackupDirectory()+"mysqldump.sql.gz", servertoreseed, task) + // } } if s.ErrKey == "WARN0101" { cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Cluster have backup") From 810487facd37e97f41d22c840fb0136b0dde1b21 Mon Sep 17 00:00:00 2001 From: caffeinated92 Date: Wed, 26 Jun 2024 18:08:46 +0700 Subject: [PATCH 10/10] Add load to check existing task --- cluster/srv_job.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cluster/srv_job.go b/cluster/srv_job.go index 919a32b77..b7e865c8c 100644 --- a/cluster/srv_job.go +++ b/cluster/srv_job.go @@ -299,8 +299,10 @@ func (server *ServerMonitor) JobFlashbackLogicalBackup() (int64, error) { server.createCookie("cookie_waitbackup") return 0, errors.New("No Logical Backup") } - var jobid int64 - jobid, err = server.JobInsertTaks(task, server.SSTPort, cluster.Conf.MonitorAddress) + if v, ok := server.ActiveTasks.Load(task); ok { + dt = v.(DBTask) + } + jobid, err := server.JobInsertTaks(task, server.SSTPort, cluster.Conf.MonitorAddress) if err != nil { cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlErr, "Receive flashback logical backup %s request for server: %s %s", cluster.Conf.BackupLogicalType, server.URL, err) return jobid, err
Failover Limit
+ + + {{selectedCluster.config.failoverLimit}} +
Checks failover & switchover constraints