Skip to content

Commit

Permalink
Merge pull request #624 from signal18/restic
Browse files Browse the repository at this point in the history
backup physical and binlogs to restic
  • Loading branch information
svaroqui committed Jun 6, 2024
2 parents 3abd2df + 709b964 commit c42145c
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 44 deletions.
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type Cluster struct {
tunnel *ssh.Client `json:"-"`
QueryRules map[uint32]config.QueryRule `json:"-"`
Backups []v3.Backup `json:"-"`
BackupStat v3.BackupStat `json:"backupStat"`
SLAHistory []state.Sla `json:"slaHistory"`
APIUsers map[string]APIUser `json:"apiUsers"`
Schedule map[string]cron.Entry `json:"-"`
Expand Down
9 changes: 8 additions & 1 deletion cluster/cluster_bash.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,20 @@ func (cluster *Cluster) BinlogRotationScript(srv *ServerMonitor) error {
return nil
}

func (cluster *Cluster) BinlogCopyScript(srv *ServerMonitor, binlog string) error {
func (cluster *Cluster) BinlogCopyScript(srv *ServerMonitor, binlog string, isPurge bool) error {
if cluster.Conf.BinlogCopyScript != "" {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, "INFO", "Calling binlog copy script on %s. Binlog: %s", srv.URL, binlog)
var out []byte
out, err := exec.Command(cluster.Conf.BinlogCopyScript, cluster.Name, srv.Host, srv.Port, strconv.Itoa(cluster.Conf.OnPremiseSSHPort), srv.BinaryLogDir, srv.GetMyBackupDirectory(), binlog).CombinedOutput()
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, "ERROR", "%s", err)
} else {
// Skip backup to restic if in purge binlog
if !isPurge {
// Backup to restic when no error (defer to prevent unfinished physical copy)
backtype := "binlog"
defer srv.BackupRestic(cluster.Conf.Cloud18GitUser, cluster.Name, srv.DBVersion.Flavor, srv.DBVersion.ToString(), backtype)
}
}

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, "INFO", "Binlog copy script complete: %s", string(out))
Expand Down
53 changes: 53 additions & 0 deletions cluster/cluster_bck.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,60 @@ func (cluster *Cluster) ResticFetchRepo() error {
}
}
cluster.Backups = filterRepo

cluster.ResticFetchRepoStat()
}

return nil
}

func (cluster *Cluster) ResticFetchRepoStat() error {

// defer func() { cluster.canResticFetchRepo = true }()
// if cluster.Conf.BackupRestic && cluster.canResticFetchRepo {
cluster.canResticFetchRepo = false
// var stdout, stderr []byte
var stdoutBuf, stderrBuf bytes.Buffer
var errStdout, errStderr error
resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, "stats", "--mode", "raw-data", "--json")
stdoutIn, _ := resticcmd.StdoutPipe()
stderrIn, _ := resticcmd.StderrPipe()
stdout := io.MultiWriter(os.Stdout, &stdoutBuf)
stderr := io.MultiWriter(os.Stderr, &stderrBuf)

resticcmd.Env = cluster.ResticGetEnv()
if err := resticcmd.Start(); err != nil {
cluster.SetState("WARN0094", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0094"], resticcmd.Path, err, ""), ErrFrom: "BACKUP"})
return err
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
_, errStdout = io.Copy(stdout, stdoutIn)
wg.Done()
}()

_, errStderr = io.Copy(stderr, stderrIn)
wg.Wait()

err := resticcmd.Wait()
if err != nil {
cluster.StateMachine.AddState("WARN0093", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0093"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
cluster.ResticInitRepo()
return err
}
if errStdout != nil || errStderr != nil {
return errors.New("failed to capture stdout or stderr\n")
}

var repostat v3.BackupStat
err = json.Unmarshal(stdoutBuf.Bytes(), &repostat)
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Error unmarshal backups %s", err)
return err
}
cluster.BackupStat = repostat
// }

return nil
}
18 changes: 12 additions & 6 deletions cluster/cluster_sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (cluster *Cluster) SSTRunReceiverToRestic(filename string) (string, error)
return strconv.Itoa(destinationPort), nil
}

func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) (string, error) {
func (cluster *Cluster) SSTRunReceiverToFile(server *ServerMonitor, filename string, openfile string) (string, error) {
sst := new(SST)
sst.cluster = cluster
var writers []io.Writer
Expand Down Expand Up @@ -148,12 +148,12 @@ func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) (
SSTs.Lock()
SSTs.SSTconnections[destinationPort] = sst
SSTs.Unlock()
go sst.tcp_con_handle_to_file()
go sst.tcp_con_handle_to_file(server)

return strconv.Itoa(destinationPort), nil
}

func (cluster *Cluster) SSTRunReceiverToGZip(filename string, openfile string) (string, error) {
func (cluster *Cluster) SSTRunReceiverToGZip(server *ServerMonitor, filename string, openfile string) (string, error) {
sst := new(SST)
sst.cluster = cluster

Expand Down Expand Up @@ -189,12 +189,12 @@ func (cluster *Cluster) SSTRunReceiverToGZip(filename string, openfile string) (
SSTs.Lock()
SSTs.SSTconnections[destinationPort] = sst
SSTs.Unlock()
go sst.tcp_con_handle_to_gzip()
go sst.tcp_con_handle_to_gzip(server)

return strconv.Itoa(destinationPort), nil
}

func (sst *SST) tcp_con_handle_to_gzip() {
func (sst *SST) tcp_con_handle_to_gzip(server *ServerMonitor) {

var err error

Expand All @@ -211,6 +211,9 @@ func (sst *SST) tcp_con_handle_to_gzip() {
delete(SSTs.SSTconnections, port)
sst.cluster.SSTSenderFreePort(strconv.Itoa(port))
SSTs.Unlock()

backtype := "physical"
server.BackupRestic(sst.cluster.Conf.Cloud18GitUser, sst.cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype, sst.cluster.Conf.BackupPhysicalType)
}()

sst.in, err = sst.listener.Accept()
Expand All @@ -231,7 +234,7 @@ func (sst *SST) tcp_con_handle_to_gzip() {
}
}

func (sst *SST) tcp_con_handle_to_file() {
func (sst *SST) tcp_con_handle_to_file(server *ServerMonitor) {

var err error

Expand All @@ -247,6 +250,9 @@ func (sst *SST) tcp_con_handle_to_file() {
delete(SSTs.SSTconnections, port)
sst.cluster.SSTSenderFreePort(strconv.Itoa(port))
SSTs.Unlock()

backtype := "physical"
server.BackupRestic(sst.cluster.Conf.Cloud18GitUser, sst.cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype, sst.cluster.Conf.BackupPhysicalType)
}()

sst.in, err = sst.listener.Accept()
Expand Down
4 changes: 3 additions & 1 deletion cluster/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,9 @@ func (server *ServerMonitor) Refresh() error {
}

if cluster.Conf.BackupBinlogs {
server.InitiateJobBackupBinlog(server.BinaryLogFilePrevious)
//Set second parameter to false, not part of backupbinlogpurge
server.InitiateJobBackupBinlog(server.BinaryLogFilePrevious, false)
//Initiate purging backup binlog
go server.JobBackupBinlogPurge(server.BinaryLogFilePrevious)
}

Expand Down
54 changes: 39 additions & 15 deletions cluster/srv_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ func (server *ServerMonitor) JobBackupPhysical() (int64, error) {
var backupext string = ".xbtream"
if cluster.Conf.CompressBackups {
backupext = backupext + ".gz"
port, err = cluster.SSTRunReceiverToGZip(server.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, ConstJobCreateFile)
port, err = cluster.SSTRunReceiverToGZip(server, server.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, ConstJobCreateFile)
if err != nil {
return 0, nil
}
} else {
port, err = cluster.SSTRunReceiverToFile(server.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, ConstJobCreateFile)
port, err = cluster.SSTRunReceiverToFile(server, server.GetMyBackupDirectory()+cluster.Conf.BackupPhysicalType+backupext, ConstJobCreateFile)
if err != nil {
return 0, nil
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func (server *ServerMonitor) JobBackupErrorLog() (int64, error) {
if server.IsDown() {
return 0, nil
}
port, err := cluster.SSTRunReceiverToFile(server.Datadir+"/log/log_error.log", ConstJobAppendFile)
port, err := cluster.SSTRunReceiverToFile(server, server.Datadir+"/log/log_error.log", ConstJobAppendFile)
if err != nil {
return 0, nil
}
Expand Down Expand Up @@ -378,7 +378,7 @@ func (server *ServerMonitor) JobBackupSlowQueryLog() (int64, error) {
if server.IsDown() {
return 0, nil
}
port, err := cluster.SSTRunReceiverToFile(server.Datadir+"/log/log_slow_query.log", ConstJobAppendFile)
port, err := cluster.SSTRunReceiverToFile(server, server.Datadir+"/log/log_slow_query.log", ConstJobAppendFile)
if err != nil {
return 0, nil
}
Expand Down Expand Up @@ -850,7 +850,8 @@ func (server *ServerMonitor) JobBackupLogical() error {
}

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Finish logical backup %s for: %s", cluster.Conf.BackupLogicalType, server.URL)
server.BackupRestic()
backtype := "logical"
server.BackupRestic(cluster.Conf.Cloud18GitUser, cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype, cluster.Conf.BackupLogicalType)
return nil
}

Expand All @@ -867,13 +868,24 @@ func (server *ServerMonitor) copyLogs(r io.Reader) {
}
}

func (server *ServerMonitor) BackupRestic() error {
func (server *ServerMonitor) BackupRestic(tags ...string) error {
cluster := server.ClusterGroup
var stdout, stderr []byte
var errStdout, errStderr error

if cluster.Conf.BackupRestic {
resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, "backup", server.GetMyBackupDirectory())
args := make([]string, 0)

args = append(args, "backup")
for _, tag := range tags {
if tag != "" {
args = append(args, "--tag")
args = append(args, tag)
}
}
args = append(args, server.GetMyBackupDirectory())

resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, args...)

stdoutIn, _ := resticcmd.StdoutPipe()
stderrIn, _ := resticcmd.StderrPipe()
Expand Down Expand Up @@ -1006,7 +1018,7 @@ func (server *ServerMonitor) JobRunViaSSH() error {
return nil
}

func (server *ServerMonitor) JobBackupBinlog(binlogfile string) error {
func (server *ServerMonitor) JobBackupBinlog(binlogfile string, isPurge bool) error {
cluster := server.ClusterGroup
if !server.IsMaster() {
return errors.New("Copy only master binlog")
Expand Down Expand Up @@ -1037,7 +1049,12 @@ func (server *ServerMonitor) JobBackupBinlog(binlogfile string) error {
return cmdrunErr
}

// Get
//Skip copying to resting when purge due to batching
if !isPurge {
// Backup to restic when no error (defer to prevent unfinished physical copy)
backtype := "binlog"
defer server.BackupRestic(cluster.Conf.Cloud18GitUser, cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype)
}

return nil
}
Expand All @@ -1060,7 +1077,8 @@ func (server *ServerMonitor) JobBackupBinlogPurge(binlogfile string) error {
if _, err := os.Stat(server.GetMyBackupDirectory() + "/" + filename); os.IsNotExist(err) {
if _, ok := server.BinaryLogFiles[filename]; ok {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Backup master missing binlog of %s,%s", server.URL, filename)
server.InitiateJobBackupBinlog(filename)
//Set true to skip sending to resting multiple times
server.InitiateJobBackupBinlog(filename, true)
}
}
keeping[filename] = binlogfilestop
Expand Down Expand Up @@ -1212,7 +1230,7 @@ func (cluster *Cluster) JobRejoinMysqldumpFromSource(source *ServerMonitor, dest
return nil
}

func (server *ServerMonitor) JobBackupBinlogSSH(binlogfile string) error {
func (server *ServerMonitor) JobBackupBinlogSSH(binlogfile string, isPurge bool) error {
cluster := server.ClusterGroup
if !server.IsMaster() {
return errors.New("Copy only master binlog")
Expand Down Expand Up @@ -1261,10 +1279,16 @@ func (server *ServerMonitor) JobBackupBinlogSSH(binlogfile string) error {
return err
}

//Skip copying to resting when purge due to batching
if !isPurge {
// Backup to restic when no error (defer to prevent unfinished physical copy)
backtype := "binlog"
defer server.BackupRestic(cluster.Conf.Cloud18GitUser, cluster.Name, server.DBVersion.Flavor, server.DBVersion.ToString(), backtype)
}
return nil
}

func (server *ServerMonitor) InitiateJobBackupBinlog(binlogfile string) error {
func (server *ServerMonitor) InitiateJobBackupBinlog(binlogfile string, isPurge bool) error {
cluster := server.ClusterGroup

if server.BinaryLogDir == "" {
Expand All @@ -1283,11 +1307,11 @@ func (server *ServerMonitor) InitiateJobBackupBinlog(binlogfile string) error {

switch cluster.Conf.BinlogCopyMode {
case "client", "mysqlbinlog":
return server.JobBackupBinlog(binlogfile)
return server.JobBackupBinlog(binlogfile, isPurge)
case "ssh":
return server.JobBackupBinlogSSH(binlogfile)
return server.JobBackupBinlogSSH(binlogfile, isPurge)
case "script":
return cluster.BinlogCopyScript(server, binlogfile)
return cluster.BinlogCopyScript(server, binlogfile, isPurge)
}

return errors.New("Wrong configuration for Backup Binlog Method!")
Expand Down
7 changes: 7 additions & 0 deletions repmanv3/messages.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions share/dashboard/app/dashboard.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ app.controller('DashboardController', function (
{ id: '6', name: 'SAT' },
];

$scope.humanFileSize = function(size) {
var i = size == 0 ? 0 : Math.floor(Math.log(size) / Math.log(1024));
return +((size / Math.pow(1024, i)).toFixed(2)) * 1 + ' ' + ['B', 'kB', 'MB', 'GB', 'TB'][i];
}

var getClusterUrl = function () {
return AppService.getClusterUrl($scope.selectedClusterName);
};
Expand Down
Loading

0 comments on commit c42145c

Please sign in to comment.