Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong label for state and fix compressed backup reseed function #781

Merged
merged 10 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cluster/cluster_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,7 +1772,9 @@ func (cluster *Cluster) SetMasterNil() {
}

func (cluster *Cluster) SetApiTokenTimeout(value int) {
cluster.Lock()
cluster.Conf.TokenTimeout = value
cluster.Unlock()
}

func (cluster *Cluster) SetSSTBufferSize(value int) {
cluster.Conf.SSTSendBuffer = value
}
118 changes: 78 additions & 40 deletions cluster/cluster_sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package cluster

import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -428,13 +429,14 @@ func (sst *SST) stream_copy_to_restic() <-chan int {
return sync_channel
}

func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor, task string) {
func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) {
var err error
port, _ := strconv.Atoi(sv.SSTPort)

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlWarn, "SST Reseed to port %s server %s", sv.SSTPort, sv.Host)

if cluster.Conf.SchedulerReceiverUseSSL {
cluster.SSTRunSenderSSL(backupfile, sv, task)
cluster.SSTRunSenderSSL(backupfile, sv)
return
}

Expand All @@ -443,17 +445,71 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor, task
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlErr, "SST Reseed failed connection to port %s server %s %s ", sv.SSTPort, sv.Host, err)
return
}

defer client.Close()
file, err := os.Open(backupfile)

if strings.HasSuffix(backupfile, "gz") {
err = cluster.SSTRunSendGzip(client, backupfile, sv)
} else {
err = cluster.SSTRunSendFile(client, backupfile, sv)
}
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlErr, "%s", err.Error())
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "Backup failed to send, closing connection!")
} else {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "Backup has been sent, closing connection!")
}
}

func (cluster *Cluster) SSTRunSendGzip(client net.Conn, backupfile string, sv *ServerMonitor) error {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "SST sending file: %s to node: %s port: %s", backupfile, sv.Host, sv.SSTPort)
file, err := os.Open(backupfile)
if err != nil {
return errors.New(fmt.Sprintf("SST to server %s failed to open backup file, err: %s ", sv.URL, err))
}

sendBuffer := make([]byte, cluster.Conf.SSTSendBuffer)
//fmt.Println("Start sending file!")
var total uint64

defer file.Close()

fz, err := gzip.NewReaderN(file, cluster.Conf.SSTSendBuffer, 4)
if err != nil {
return errors.New(fmt.Sprintf("SST to server %s failed in init gzip reader, err: %s", sv.URL, err))
}
defer fz.Close()

// Read and send data in chunks
for {
n, err := fz.Read(sendBuffer)
if err != nil && err != io.EOF {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlErr, "SST failed to read decompressed data: %v", err)
}
if n > 0 {
// Send the chunk to the network connection
if bts, err := client.Write(sendBuffer[:n]); err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlErr, "SST failed to write chunk at position %d: %v", total, err)
} else {
total = total + uint64(bts)
}
}
if err == io.EOF {
break
}
}
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "Backup has been sent, closing connection!")

return nil
}

func (cluster *Cluster) SSTRunSendFile(client net.Conn, backupfile string, sv *ServerMonitor) error {
file, err := os.Open(backupfile)
if os.IsNotExist(err) && cluster.Conf.CompressBackups {
backupfile = strings.Replace(backupfile, "xbtream", "gz", 1)
file, err = os.Open(backupfile)
return cluster.SSTRunSendGzip(client, backupfile, sv)
}
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlErr, "SST failed to open backup file server %s %s ", sv.URL, err)
return
return errors.New(fmt.Sprintf("SST to server %s failed to open backup file, err: %s ", sv.URL, err))
}

sendBuffer := make([]byte, cluster.Conf.SSTSendBuffer)
Expand All @@ -463,18 +519,9 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor, task
defer file.Close()

for {
if strings.HasSuffix(backupfile, "gz") {
fz, err := gzip.NewReader(file)
if err != nil {
return
}
defer fz.Close()
fz.Read(sendBuffer)
} else {
_, err = file.Read(sendBuffer)
if err == io.EOF {
break
}
_, err = file.Read(sendBuffer)
if err == io.EOF {
break
}

bts, err := client.Write(sendBuffer)
Expand All @@ -485,9 +532,10 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor, task
}
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "Backup has been sent, closing connection!")

return nil
}

func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor, task string) {
func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) {
var (
client *tls.Conn
err error
Expand All @@ -500,28 +548,18 @@ func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor, ta
return
}
defer client.Close()
file, err := os.Open(backupfile)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "SST sending file via SSL: %s to node: %s port: %s", backupfile, sv.Host, sv.SSTPort)
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlErr, "SST failed to open backup file server %s %s ", sv.URL, err)
return
}
sendBuffer := make([]byte, 16384)
var total uint64

defer file.Close()
for {
_, err = file.Read(sendBuffer)
if err == io.EOF {
break
}
bts, err := client.Write(sendBuffer)
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlErr, "SST failed to write chunk %s at position %d", err, total)
}
total = total + uint64(bts)
if strings.HasSuffix(backupfile, "gz") {
err = cluster.SSTRunSendGzip(client, backupfile, sv)
} else {
err = cluster.SSTRunSendFile(client, backupfile, sv)
}
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlErr, "%s", err.Error())
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "Backup failed to send, closing connection!")
} else {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "Backup has been sent via SSL, closing connection!")
}
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, config.LvlInfo, "Backup has been sent via SSL , closing connection!")
}

func (cluster *Cluster) SSTGetSenderPort() string {
Expand Down
1 change: 1 addition & 0 deletions cluster/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ func (server *ServerMonitor) Refresh() error {
// Job section
server.JobsCheckFinished()
server.JobsCheckErrors()
server.JobsCheckPending()

if server.NeedRefreshJobs {
server.JobsUpdateEntries()
Expand Down
90 changes: 68 additions & 22 deletions cluster/srv_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,6 @@ func (server *ServerMonitor) JobFlashbackPhysicalBackup() (int64, error) {

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlInfo, "Receive reseed physical backup %s request for server: %s", cluster.Conf.BackupPhysicalType, server.URL)

cluster.SetState("WARN0076", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(cluster.GetErrorList()["WARN0076"], cluster.Conf.BackupPhysicalType, server.URL), ErrFrom: "REJOIN", ServerUrl: server.URL})

return jobid, err
}

Expand Down Expand Up @@ -822,7 +820,7 @@ func (server *ServerMonitor) JobsCheckRunning() error {
return nil
}
//server.JobInsertTask("", "", "")
rows, err := server.Conn.Queryx("SELECT task ,count(*) as ct, max(id) as id FROM replication_manager_schema.jobs WHERE done=0 AND result IS NULL group by task ")
rows, err := server.Conn.Queryx("SELECT task ,count(*) as ct, max(id) as id FROM replication_manager_schema.jobs WHERE state=0 group by task ")
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "Scheduler error fetching replication_manager_schema.jobs %s", err)
server.JobsCreateTable()
Expand Down Expand Up @@ -876,6 +874,42 @@ func (server *ServerMonitor) JobsCheckRunning() error {
return nil
}

func (server *ServerMonitor) JobsCheckPending() error {
cluster := server.ClusterGroup
if server.IsDown() {
return nil
}

//Only cancel if not reseeding status
if server.IsReseeding {
return nil
}

//server.JobInsertTask("", "", "")
rows, err := server.Conn.Queryx("SELECT task ,count(*) as ct, max(id) as id FROM replication_manager_schema.jobs WHERE state=2 group by task ")
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "Scheduler error fetching replication_manager_schema.jobs %s", err)
server.JobsCreateTable()
return err
}
defer rows.Close()
for rows.Next() {
var task DBTask
rows.Scan(&task.task, &task.ct, &task.id)
if task.ct > 0 {
switch task.task {
case "reseedxtrabackup", "reseedmariabackup", "flashbackxtrabackup", "flashbackmariabackup":
res := "Replication-manager is down while preparing task, cancelling operation for data safety."
query := "UPDATE replication_manager_schema.jobs SET state=5, result='%s' where task = '%s'"
server.ExecQueryNoBinLog(fmt.Sprintf(query, res, task.task))
server.SetNeedRefreshJobs(true)
}
}
}

return nil
}

func (server *ServerMonitor) JobsCheckErrors() error {
var err error

Expand Down Expand Up @@ -1872,11 +1906,11 @@ func (server *ServerMonitor) InitiateJobBackupBinlog(binlogfile string, isPurge
return errors.New("Wrong configuration for Backup Binlog Method!")
}

func (server *ServerMonitor) CallbackMysqldump(dt DBTask) error {
func (server *ServerMonitor) WaitAndSendSST(task string, filename string, loop int) error {
cluster := server.ClusterGroup
var err error

rows, err := server.Conn.Queryx("SELECT done FROM replication_manager_schema.jobs WHERE id=?", dt.id)
rows, err := server.Conn.Queryx(fmt.Sprintf("SELECT done FROM replication_manager_schema.jobs WHERE task='%s' and state=%d", task, 2))
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlErr, "Scheduler error fetching replication_manager_schema.jobs %s", err)
server.JobsCreateTable()
Expand All @@ -1891,18 +1925,24 @@ func (server *ServerMonitor) CallbackMysqldump(dt DBTask) error {
count++
}

time.Sleep(time.Second * 15)
//Check if id exists
if count > 0 {
if done == 1 {
server.StartSlave()
return nil
} else {
time.Sleep(time.Second * time.Duration(cluster.Conf.MonitoringTicker))
return server.CallbackMysqldump(dt)
query := "UPDATE replication_manager_schema.jobs SET state=1, result='processing' where task = '%s'"
server.ExecQueryNoBinLog(fmt.Sprintf(query, task))
go cluster.SSTRunSender(filename, server)
return nil
} else {
if loop < 10 {
loop++
return server.WaitAndSendSST(task, filename, loop)
}
}

return err
query := "UPDATE replication_manager_schema.jobs SET state=5, result='Waiting more than max loop' where task = '%s'"
server.ExecQueryNoBinLog(fmt.Sprintf(query, task))
server.SetNeedRefreshJobs(true)
return errors.New("Error: waiting for " + task + " more than max loop.")
}

func (server *ServerMonitor) ProcessReseedPhysical() error {
Expand All @@ -1919,11 +1959,12 @@ func (server *ServerMonitor) ProcessReseedPhysical() error {
backupext = backupext + ".gz"
}

filename := master.GetMasterBackupDirectory() + cluster.Conf.BackupPhysicalType + backupext
if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, server, task)
} else {
go cluster.SSTRunSender(master.GetMasterBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, server, task)
filename = mybcksrv.GetMyBackupDirectory() + cluster.Conf.BackupPhysicalType + backupext
}

go server.WaitAndSendSST(task, filename, 0)
} else {
err = errors.New("No master found")
return err
Expand All @@ -1940,15 +1981,19 @@ func (server *ServerMonitor) ProcessFlashbackPhysical() error {
return err
} else {
mybcksrv := cluster.GetBackupServer()
server.SetInReseedBackup(true)
backupext := ".xbtream"
task := "flashback" + cluster.Conf.BackupPhysicalType
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Sending server physical backup to flashback reseed %s", server.URL)

if cluster.Conf.CompressBackups {
backupext = backupext + ".gz"
}

filename := server.GetMasterBackupDirectory() + cluster.Conf.BackupPhysicalType + backupext
if mybcksrv != nil {
go cluster.SSTRunSender(mybcksrv.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", server, task)
} else {
go cluster.SSTRunSender(server.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+".xbtream", server, task)
filename = mybcksrv.GetMyBackupDirectory() + cluster.Conf.BackupPhysicalType + backupext
}

go server.WaitAndSendSST(task, filename, 0)
}
return nil
}
Expand Down Expand Up @@ -2023,8 +2068,9 @@ func (server *ServerMonitor) WriteJobLogs(mod int, encrypted, key, iv string) er
func (server *ServerMonitor) ParseLogEntries(entry config.LogEntry, mod int) error {
cluster := server.ClusterGroup
if entry.Server != server.URL {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlWarn, "Log entries and source mismatch: %s", server.URL)
return errors.New("Log entries and source mismatch: %s")
err := errors.New(fmt.Sprintf("Log entries and source mismatch: %s with %s", entry.Server, server.URL))
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModTask, config.LvlWarn, err.Error())
return err
}

lines := strings.Split(strings.ReplaceAll(entry.Log, "\\n", "\n"), "\n")
Expand Down
3 changes: 3 additions & 0 deletions server/api_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,9 @@ func (repman *ReplicationManager) setSetting(mycluster *cluster.Cluster, name st
case "api-token-timeout":
val, _ := strconv.Atoi(value)
mycluster.SetApiTokenTimeout(val)
case "sst-send-buffer":
val, _ := strconv.Atoi(value)
mycluster.SetSSTBufferSize(val)
}
}

Expand Down
10 changes: 10 additions & 0 deletions share/dashboard/app/dashboard.js
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,17 @@ app.controller('DashboardController', function (

var timeFrame = $routeParams.timeFrame;

$scope.formatBytes = function(bytes, decimals = 2) {
if (bytes === 0) return '0 Bytes';

const k = 1024;
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
const i = Math.floor(Math.log(bytes) / Math.log(k));

return parseFloat((bytes / Math.pow(k, i)).toFixed(decimals)) + ' ' + sizes[i];
}

$scope.bufferList = [1024, 2048, 4096, 8192, 16384, 32768, 65536, 1048576]

if (git_data["user"] && git_data["token"] && !AppService.hasAuthHeaders()) {
git_user.username = git_data["user"];
Expand Down
Loading