Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup physical and binlogs to restic #624

Merged
merged 4 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type Cluster struct {
tunnel *ssh.Client `json:"-"`
QueryRules map[uint32]config.QueryRule `json:"-"`
Backups []v3.Backup `json:"-"`
BackupStat v3.BackupStat `json:"backupStat"`
SLAHistory []state.Sla `json:"slaHistory"`
APIUsers map[string]APIUser `json:"apiUsers"`
Schedule map[string]cron.Entry `json:"-"`
Expand Down
9 changes: 8 additions & 1 deletion cluster/cluster_bash.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,20 @@ func (cluster *Cluster) BinlogRotationScript(srv *ServerMonitor) error {
return nil
}

func (cluster *Cluster) BinlogCopyScript(srv *ServerMonitor, binlog string) error {
func (cluster *Cluster) BinlogCopyScript(srv *ServerMonitor, binlog string, isPurge bool) error {
if cluster.Conf.BinlogCopyScript != "" {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, "INFO", "Calling binlog copy script on %s. Binlog: %s", srv.URL, binlog)
var out []byte
out, err := exec.Command(cluster.Conf.BinlogCopyScript, cluster.Name, srv.Host, srv.Port, strconv.Itoa(cluster.Conf.OnPremiseSSHPort), srv.BinaryLogDir, srv.GetMyBackupDirectory(), binlog).CombinedOutput()
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, "ERROR", "%s", err)
} else {
// Skip backup to restic if in purge binlog
if !isPurge {
// Backup to restic when no error (defer to prevent unfinished physical copy)
backtype := "binlog"
defer srv.BackupRestic(cluster.Conf.Cloud18GitUser, cluster.Name, srv.DBVersion.Flavor, srv.DBVersion.ToString(), backtype)
}
}

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, "INFO", "Binlog copy script complete: %s", string(out))
Expand Down
53 changes: 53 additions & 0 deletions cluster/cluster_bck.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,60 @@ func (cluster *Cluster) ResticFetchRepo() error {
}
}
cluster.Backups = filterRepo

cluster.ResticFetchRepoStat()
}

return nil
}

func (cluster *Cluster) ResticFetchRepoStat() error {

// defer func() { cluster.canResticFetchRepo = true }()
// if cluster.Conf.BackupRestic && cluster.canResticFetchRepo {
cluster.canResticFetchRepo = false
// var stdout, stderr []byte
var stdoutBuf, stderrBuf bytes.Buffer
var errStdout, errStderr error
resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, "stats", "--mode", "raw-data", "--json")
stdoutIn, _ := resticcmd.StdoutPipe()
stderrIn, _ := resticcmd.StderrPipe()
stdout := io.MultiWriter(os.Stdout, &stdoutBuf)
stderr := io.MultiWriter(os.Stderr, &stderrBuf)

resticcmd.Env = cluster.ResticGetEnv()
if err := resticcmd.Start(); err != nil {
cluster.SetState("WARN0094", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0094"], resticcmd.Path, err, ""), ErrFrom: "BACKUP"})
return err
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
_, errStdout = io.Copy(stdout, stdoutIn)
wg.Done()
}()

_, errStderr = io.Copy(stderr, stderrIn)
wg.Wait()

err := resticcmd.Wait()
if err != nil {
cluster.StateMachine.AddState("WARN0093", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0093"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
cluster.ResticInitRepo()
return err
}
if errStdout != nil || errStderr != nil {
return errors.New("failed to capture stdout or stderr\n")
}

var repostat v3.BackupStat
err = json.Unmarshal(stdoutBuf.Bytes(), &repostat)
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Error unmarshal backups %s", err)
return err
}
cluster.BackupStat = repostat
// }

return nil
}
18 changes: 12 additions & 6 deletions cluster/cluster_sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (cluster *Cluster) SSTRunReceiverToRestic(filename string) (string, error)
return strconv.Itoa(destinationPort), nil
}

func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) (string, error) {
func (cluster *Cluster) SSTRunReceiverToFile(server *ServerMonitor, filename string, openfile string) (string, error) {
sst := new(SST)
sst.cluster = cluster
var writers []io.Writer
Expand Down Expand Up @@ -148,12 +148,12 @@ func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) (
SSTs.Lock()
SSTs.SSTconnections[destinationPort] = sst
SSTs.Unlock()
go sst.tcp_con_handle_to_file()
go sst.tcp_con_handle_to_file(server)

return strconv.Itoa(destinationPort), nil
}

func (cluster *Cluster) SSTRunReceiverToGZip(filename string, openfile string) (string, error) {
func (cluster *Cluster) SSTRunReceiverToGZip(server *ServerMonitor, filename string, openfile string) (string, error) {
sst := new(SST)
sst.cluster = cluster

Expand Down Expand Up @@ -189,12 +189,12 @@ func (cluster *Cluster) SSTRunReceiverToGZip(filename string, openfile string) (
SSTs.Lock()
SSTs.SSTconnections[destinationPort] = sst
SSTs.Unlock()
go sst.tcp_con_handle_to_gzip()
go sst.tcp_con_handle_to_gzip(server)

return strconv.Itoa(destinationPort), nil
}

func (sst *SST) tcp_con_handle_to_gzip() {
func (sst *SST) tcp_con_handle_to_gzip(server *ServerMonitor) {

var err error

Expand All @@ -211,6 +211,9 @@ func (sst *SST) tcp_con_handle_to_gzip() {
delete(SSTs.SSTconnections, port)
sst.cluster.SSTSenderFreePort(strconv.Itoa(port))
SSTs.Unlock()

backtype := "physical"
server.BackupRestic(sst.cluster.Conf.Cloud18GitUser, sst.cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype, sst.cluster.Conf.BackupPhysicalType)
}()

sst.in, err = sst.listener.Accept()
Expand All @@ -231,7 +234,7 @@ func (sst *SST) tcp_con_handle_to_gzip() {
}
}

func (sst *SST) tcp_con_handle_to_file() {
func (sst *SST) tcp_con_handle_to_file(server *ServerMonitor) {

var err error

Expand All @@ -247,6 +250,9 @@ func (sst *SST) tcp_con_handle_to_file() {
delete(SSTs.SSTconnections, port)
sst.cluster.SSTSenderFreePort(strconv.Itoa(port))
SSTs.Unlock()

backtype := "physical"
server.BackupRestic(sst.cluster.Conf.Cloud18GitUser, sst.cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype, sst.cluster.Conf.BackupPhysicalType)
}()

sst.in, err = sst.listener.Accept()
Expand Down
4 changes: 3 additions & 1 deletion cluster/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,9 @@ func (server *ServerMonitor) Refresh() error {
}

if cluster.Conf.BackupBinlogs {
server.InitiateJobBackupBinlog(server.BinaryLogFilePrevious)
//Set second parameter to false, not part of backupbinlogpurge
server.InitiateJobBackupBinlog(server.BinaryLogFilePrevious, false)
//Initiate purging backup binlog
go server.JobBackupBinlogPurge(server.BinaryLogFilePrevious)
}

Expand Down
54 changes: 39 additions & 15 deletions cluster/srv_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ func (server *ServerMonitor) JobBackupPhysical() (int64, error) {
var backupext string = ".xbtream"
if cluster.Conf.CompressBackups {
backupext = backupext + ".gz"
port, err = cluster.SSTRunReceiverToGZip(server.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, ConstJobCreateFile)
port, err = cluster.SSTRunReceiverToGZip(server, server.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, ConstJobCreateFile)
if err != nil {
return 0, nil
}
} else {
port, err = cluster.SSTRunReceiverToFile(server.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, ConstJobCreateFile)
port, err = cluster.SSTRunReceiverToFile(server, server.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, ConstJobCreateFile)
if err != nil {
return 0, nil
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func (server *ServerMonitor) JobBackupErrorLog() (int64, error) {
if server.IsDown() {
return 0, nil
}
port, err := cluster.SSTRunReceiverToFile(server.Datadir+"/log/log_error.log", ConstJobAppendFile)
port, err := cluster.SSTRunReceiverToFile(server, server.Datadir+"/log/log_error.log", ConstJobAppendFile)
if err != nil {
return 0, nil
}
Expand Down Expand Up @@ -378,7 +378,7 @@ func (server *ServerMonitor) JobBackupSlowQueryLog() (int64, error) {
if server.IsDown() {
return 0, nil
}
port, err := cluster.SSTRunReceiverToFile(server.Datadir+"/log/log_slow_query.log", ConstJobAppendFile)
port, err := cluster.SSTRunReceiverToFile(server, server.Datadir+"/log/log_slow_query.log", ConstJobAppendFile)
if err != nil {
return 0, nil
}
Expand Down Expand Up @@ -850,7 +850,8 @@ func (server *ServerMonitor) JobBackupLogical() error {
}

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Finish logical backup %s for: %s", cluster.Conf.BackupLogicalType, server.URL)
server.BackupRestic()
backtype := "logical"
server.BackupRestic(cluster.Conf.Cloud18GitUser, cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype, cluster.Conf.BackupLogicalType)
return nil
}

Expand All @@ -867,13 +868,24 @@ func (server *ServerMonitor) copyLogs(r io.Reader) {
}
}

func (server *ServerMonitor) BackupRestic() error {
func (server *ServerMonitor) BackupRestic(tags ...string) error {
cluster := server.ClusterGroup
var stdout, stderr []byte
var errStdout, errStderr error

if cluster.Conf.BackupRestic {
resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, "backup", server.GetMyBackupDirectory())
args := make([]string, 0)

args = append(args, "backup")
for _, tag := range tags {
if tag != "" {
args = append(args, "--tag")
args = append(args, tag)
}
}
args = append(args, server.GetMyBackupDirectory())

resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, args...)

stdoutIn, _ := resticcmd.StdoutPipe()
stderrIn, _ := resticcmd.StderrPipe()
Expand Down Expand Up @@ -1006,7 +1018,7 @@ func (server *ServerMonitor) JobRunViaSSH() error {
return nil
}

func (server *ServerMonitor) JobBackupBinlog(binlogfile string) error {
func (server *ServerMonitor) JobBackupBinlog(binlogfile string, isPurge bool) error {
cluster := server.ClusterGroup
if !server.IsMaster() {
return errors.New("Copy only master binlog")
Expand Down Expand Up @@ -1037,7 +1049,12 @@ func (server *ServerMonitor) JobBackupBinlog(binlogfile string) error {
return cmdrunErr
}

// Get
//Skip copying to resting when purge due to batching
if !isPurge {
// Backup to restic when no error (defer to prevent unfinished physical copy)
backtype := "binlog"
defer server.BackupRestic(cluster.Conf.Cloud18GitUser, cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype)
}

return nil
}
Expand All @@ -1060,7 +1077,8 @@ func (server *ServerMonitor) JobBackupBinlogPurge(binlogfile string) error {
if _, err := os.Stat(server.GetMyBackupDirectory() + "/" + filename); os.IsNotExist(err) {
if _, ok := server.BinaryLogFiles[filename]; ok {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Backup master missing binlog of %s,%s", server.URL, filename)
server.InitiateJobBackupBinlog(filename)
//Set true to skip sending to resting multiple times
server.InitiateJobBackupBinlog(filename, true)
}
}
keeping[filename] = binlogfilestop
Expand Down Expand Up @@ -1212,7 +1230,7 @@ func (cluster *Cluster) JobRejoinMysqldumpFromSource(source *ServerMonitor, dest
return nil
}

func (server *ServerMonitor) JobBackupBinlogSSH(binlogfile string) error {
func (server *ServerMonitor) JobBackupBinlogSSH(binlogfile string, isPurge bool) error {
cluster := server.ClusterGroup
if !server.IsMaster() {
return errors.New("Copy only master binlog")
Expand Down Expand Up @@ -1261,10 +1279,16 @@ func (server *ServerMonitor) JobBackupBinlogSSH(binlogfile string) error {
return err
}

//Skip copying to resting when purge due to batching
if !isPurge {
// Backup to restic when no error (defer to prevent unfinished physical copy)
backtype := "binlog"
defer server.BackupRestic(cluster.Conf.Cloud18GitUser, cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype)
}
return nil
}

func (server *ServerMonitor) InitiateJobBackupBinlog(binlogfile string) error {
func (server *ServerMonitor) InitiateJobBackupBinlog(binlogfile string, isPurge bool) error {
cluster := server.ClusterGroup

if server.BinaryLogDir == "" {
Expand All @@ -1283,11 +1307,11 @@ func (server *ServerMonitor) InitiateJobBackupBinlog(binlogfile string) error {

switch cluster.Conf.BinlogCopyMode {
case "client", "mysqlbinlog":
return server.JobBackupBinlog(binlogfile)
return server.JobBackupBinlog(binlogfile, isPurge)
case "ssh":
return server.JobBackupBinlogSSH(binlogfile)
return server.JobBackupBinlogSSH(binlogfile, isPurge)
case "script":
return cluster.BinlogCopyScript(server, binlogfile)
return cluster.BinlogCopyScript(server, binlogfile, isPurge)
}

return errors.New("Wrong configuration for Backup Binlog Method!")
Expand Down
7 changes: 7 additions & 0 deletions repmanv3/messages.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions share/dashboard/app/dashboard.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ app.controller('DashboardController', function (
{ id: '6', name: 'SAT' },
];

$scope.humanFileSize = function(size) {
var i = size == 0 ? 0 : Math.floor(Math.log(size) / Math.log(1024));
return +((size / Math.pow(1024, i)).toFixed(2)) * 1 + ' ' + ['B', 'kB', 'MB', 'GB', 'TB'][i];
}

var getClusterUrl = function () {
return AppService.getClusterUrl($scope.selectedClusterName);
};
Expand Down
Loading