Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert log modules to SST #598

Merged
merged 2 commits into from
May 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 39 additions & 38 deletions cluster/cluster_sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (cluster *Cluster) SSTWatchRestic(r io.Reader) error {
if n > 0 {
d := buf[:n]
out = append(out, d...)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, string(out))
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, string(out))
}
if err != nil {
// Read returns io.EOF at the end of file, which is not an error for us
Expand All @@ -80,31 +80,31 @@ func (cluster *Cluster) SSTRunReceiverToRestic(filename string) (string, error)

stdout, err := resticcmd.StdoutPipe()
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Exiting SST on restic StdoutPipe %s", err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Exiting SST on restic StdoutPipe %s", err)
return "", err
}
go cluster.SSTWatchRestic(stdout)
sst.outresticreader, err = resticcmd.StdinPipe()
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Exiting SST on restic StdinPipe %s", err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Exiting SST on restic StdinPipe %s", err)
return "", err
}
err = resticcmd.Start()
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Error restic command: %s", err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Error restic command: %s", err)
return "", err
}

sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":"+cluster.SSTGetSenderPort())
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Exiting SST on socket listen %s", err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Exiting SST on socket listen %s", err)
return "", err
}
sst.tcplistener = sst.listener.(*net.TCPListener)
sst.tcplistener.SetDeadline(time.Now().Add(time.Second * 120))
destinationPort := sst.listener.Addr().(*net.TCPAddr).Port
if sst.cluster.Conf.LogSST {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "Listening for SST on port %d", destinationPort)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "Listening for SST on port %d", destinationPort)
}
SSTs.Lock()
SSTs.SSTconnections[destinationPort] = sst
Expand All @@ -127,7 +127,7 @@ func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) (
}

if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Open file failed for job %s %s", filename, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Open file failed for job %s %s", filename, err)
return "", err
}
writers = append(writers, sst.file)
Expand All @@ -136,14 +136,14 @@ func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) (

sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":"+cluster.SSTGetSenderPort())
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Exiting SST on socket listen %s", err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Exiting SST on socket listen %s", err)
return "", err
}
sst.tcplistener = sst.listener.(*net.TCPListener)
sst.tcplistener.SetDeadline(time.Now().Add(time.Second * 3600))
destinationPort := sst.listener.Addr().(*net.TCPAddr).Port
if sst.cluster.Conf.LogSST {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "Listening for SST on port to file %d", destinationPort)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "Listening for SST on port to file %d", destinationPort)
}
SSTs.Lock()
SSTs.SSTconnections[destinationPort] = sst
Expand All @@ -157,7 +157,7 @@ func (cluster *Cluster) SSTRunReceiverToGZip(filename string, openfile string) (
sst := new(SST)
sst.cluster = cluster

cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "Compressing mariadb backup")
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "Compressing mariadb backup")

var err error
if openfile == ConstJobCreateFile {
Expand All @@ -171,20 +171,20 @@ func (cluster *Cluster) SSTRunReceiverToGZip(filename string, openfile string) (
sst.outfilegzipwriter = gw

if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Open file failed for job %s %s", filename, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Open file failed for job %s %s", filename, err)
return "", err
}

sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":"+cluster.SSTGetSenderPort())
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Exiting SST on socket listen %s", err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Exiting SST on socket listen %s", err)
return "", err
}
sst.tcplistener = sst.listener.(*net.TCPListener)
sst.tcplistener.SetDeadline(time.Now().Add(time.Second * 3600))
destinationPort := sst.listener.Addr().(*net.TCPAddr).Port
if sst.cluster.Conf.LogSST {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "Listening for SST on port to file %d", destinationPort)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "Listening for SST on port to file %d", destinationPort)
}
SSTs.Lock()
SSTs.SSTconnections[destinationPort] = sst
Expand All @@ -200,7 +200,7 @@ func (sst *SST) tcp_con_handle_to_gzip() {

defer func() {
if sst.cluster.Conf.LogSST {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "SST connection end cleanup %d", sst.listener.Addr().(*net.TCPAddr).Port)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "SST connection end cleanup %d", sst.listener.Addr().(*net.TCPAddr).Port)
}
port := sst.listener.Addr().(*net.TCPAddr).Port
sst.tcplistener.Close()
Expand All @@ -226,7 +226,7 @@ func (sst *SST) tcp_con_handle_to_gzip() {

case <-chan_to_stdout:
if sst.cluster.Conf.LogSST {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "Chan SST out for %d", sst.listener.Addr().(*net.TCPAddr).Port)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "Chan SST out for %d", sst.listener.Addr().(*net.TCPAddr).Port)
}
}
}
Expand All @@ -237,7 +237,7 @@ func (sst *SST) tcp_con_handle_to_file() {

defer func() {
if sst.cluster.Conf.LogSST {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "SST connection end cleanup %d", sst.listener.Addr().(*net.TCPAddr).Port)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "SST connection end cleanup %d", sst.listener.Addr().(*net.TCPAddr).Port)
}
port := sst.listener.Addr().(*net.TCPAddr).Port
sst.tcplistener.Close()
Expand All @@ -262,7 +262,7 @@ func (sst *SST) tcp_con_handle_to_file() {

case <-chan_to_stdout:
if sst.cluster.Conf.LogSST {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "Chan SST out for %d", sst.listener.Addr().(*net.TCPAddr).Port)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "Chan SST out for %d", sst.listener.Addr().(*net.TCPAddr).Port)
}
}
}
Expand All @@ -273,7 +273,7 @@ func (sst *SST) tcp_con_handle_to_restic() {

defer func() {
if sst.cluster.Conf.LogSST {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "SST connection end cleanup %d", sst.listener.Addr().(*net.TCPAddr).Port)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "SST connection end cleanup %d", sst.listener.Addr().(*net.TCPAddr).Port)
}
port := sst.listener.Addr().(*net.TCPAddr).Port
sst.tcplistener.Close()
Expand All @@ -298,7 +298,7 @@ func (sst *SST) tcp_con_handle_to_restic() {

case <-chan_to_stdout:
if sst.cluster.Conf.LogSST {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "Chan SST out for %d", sst.listener.Addr().(*net.TCPAddr).Port)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "Chan SST out for %d", sst.listener.Addr().(*net.TCPAddr).Port)
}
}
}
Expand All @@ -315,7 +315,7 @@ func (sst *SST) stream_copy_to_file() <-chan int {
if con, ok := sst.in.(net.Conn); ok {

if sst.cluster.Conf.LogSST {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "SST closing connection from stream_copy %v ", con.RemoteAddr())
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "SST closing connection from stream_copy %v ", con.RemoteAddr())
}
sst.in.(net.Conn).Close()
}
Expand All @@ -329,13 +329,13 @@ func (sst *SST) stream_copy_to_file() <-chan int {

if err != nil {
if err != io.EOF {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Read error: %s", err)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Read error: %s", err)
}
break
}
_, err = sst.outfilewriter.Write(buf[0:nBytes])
if err != nil {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Write error: %s", err)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Write error: %s", err)
}
}
}()
Expand All @@ -353,7 +353,7 @@ func (sst *SST) stream_copy_to_gzip() <-chan int {
if con, ok := sst.in.(net.Conn); ok {

if sst.cluster.Conf.LogSST {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "SST closing connection from stream_copy %v ", con.RemoteAddr())
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "SST closing connection from stream_copy %v ", con.RemoteAddr())
}
sst.in.(net.Conn).Close()
}
Expand All @@ -368,14 +368,14 @@ func (sst *SST) stream_copy_to_gzip() <-chan int {

if err != nil {
if err != io.EOF {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Read error: %s", err)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Read error: %s", err)
}
break
}

_, err = sst.outfilegzipwriter.Write(buf[0:nBytes])
if err != nil {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Write error: %s", err)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Write error: %s", err)
}
}

Expand All @@ -392,7 +392,7 @@ func (sst *SST) stream_copy_to_restic() <-chan int {
if con, ok := sst.in.(net.Conn); ok {

if sst.cluster.Conf.LogSST {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "SST closing connection from stream_copy %v ", con.RemoteAddr())
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "SST closing connection from stream_copy %v ", con.RemoteAddr())
}
sst.in.(net.Conn).Close()
}
Expand All @@ -406,13 +406,13 @@ func (sst *SST) stream_copy_to_restic() <-chan int {

if err != nil {
if err != io.EOF {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Read error: %s", err)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Read error: %s", err)
}
break
}
_, err = sst.outresticreader.Write(buf[0:nBytes])
if err != nil {
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "Write error: %s", err)
sst.cluster.LogModulePrintf(sst.cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "Write error: %s", err)
}
}
}()
Expand All @@ -426,21 +426,22 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) {
cluster.SSTRunSenderSSL(backupfile, sv)
return
}

client, err := net.Dial("tcp", fmt.Sprintf("%s:%d", sv.Host, port))
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "SST Reseed failed connection to port %s server %s %s ", sv.SSTPort, sv.Host, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "SST Reseed failed connection to port %s server %s %s ", sv.SSTPort, sv.Host, err)
return
}

defer client.Close()
file, err := os.Open(backupfile)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "SST sending file: %s to node: %s port: %s", backupfile, sv.Host, sv.SSTPort)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "SST sending file: %s to node: %s port: %s", backupfile, sv.Host, sv.SSTPort)
if os.IsNotExist(err) && cluster.Conf.CompressBackups {
backupfile = strings.Replace(backupfile, "xbtream", "gz", 1)
file, err = os.Open(backupfile)
}
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "SST failed to open backup file server %s %s ", sv.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "SST failed to open backup file server %s %s ", sv.URL, err)
return
}

Expand All @@ -464,11 +465,11 @@ func (cluster *Cluster) SSTRunSender(backupfile string, sv *ServerMonitor) {

bts, err := client.Write(sendBuffer)
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "SST failed to write chunk %s at position %d", err, total)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "SST failed to write chunk %s at position %d", err, total)
}
total = total + uint64(bts)
}
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "Backup has been sent, closing connection!")
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "Backup has been sent, closing connection!")

defer file.Close()

Expand All @@ -483,14 +484,14 @@ func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) {

tlsconfig := &tls.Config{InsecureSkipVerify: true}
if client, err = tls.Dial("tcp", fmt.Sprintf("%s:%d", sv.Host, port), tlsconfig); err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "SST Reseed failed connection via SSL to port %s server %s %s ", sv.SSTPort, sv.Host, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "SST Reseed failed connection via SSL to port %s server %s %s ", sv.SSTPort, sv.Host, err)
return
}
defer client.Close()
file, err := os.Open(backupfile)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "SST sending file via SSL: %s to node: %s port: %s", backupfile, sv.Host, sv.SSTPort)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "SST sending file via SSL: %s to node: %s port: %s", backupfile, sv.Host, sv.SSTPort)
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "SST failed to open backup file server %s %s ", sv.URL, err)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "SST failed to open backup file server %s %s ", sv.URL, err)
return
}
sendBuffer := make([]byte, 16384)
Expand All @@ -502,11 +503,11 @@ func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) {
}
bts, err := client.Write(sendBuffer)
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlErr, "SST failed to write chunk %s at position %d", err, total)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlErr, "SST failed to write chunk %s at position %d", err, total)
}
total = total + uint64(bts)
}
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModBackupStream, LvlInfo, "Backup has been sent via SSL , closing connection!")
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModSST, LvlInfo, "Backup has been sent via SSL , closing connection!")
}

func (cluster *Cluster) SSTGetSenderPort() string {
Expand Down