Skip to content

Commit

Permalink
Enable to custumize mydumper myloader extra parameters
Browse files Browse the repository at this point in the history
myloader add enable binlog that was brecking replication on restaure backup
Enable to custumize db job script via parameter to specify script to run
Unify export env variable to all ssh execution add REPLICATION_MANAGER_HOST_NAME REPLICATION_MANAGER_HOST_USER REPLICATION_MANAGER_HOST_PASSWORD REPLICATION_MANAGER_HOST_PORT
  • Loading branch information
svaroqui committed Jul 29, 2022
1 parent a0d767a commit b893fc4
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 69 deletions.
46 changes: 13 additions & 33 deletions cluster/prov_onpremise_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,19 @@ func (cluster *Cluster) OnPremiseStopDatabaseService(server *ServerMonitor) erro
}

func (cluster *Cluster) OnPremiseSetEnv(client *sshclient.Client, server *ServerMonitor) error {
adminuser := "admin"
adminpassword := "repman"

if user, ok := server.ClusterGroup.APIUsers[adminuser]; ok {
adminpassword = user.Password
}
buf := strings.NewReader("export MYSQL_ROOT_PASSWORD=\"" + server.Pass + "\";export REPLICATION_MANAGER_URL=\"https://" + server.ClusterGroup.Conf.MonitorAddress + ":" + server.ClusterGroup.Conf.APIPort + "\";export REPLICATION_MANAGER_USER=\"" + adminuser + "\";export REPLICATION_MANAGER_PASSWORD=\"" + adminpassword + "\";export REPLICATION_MANAGER_HOST_NAME=\"" + server.Host + "\";export REPLICATION_MANAGER_HOST_PORT=\"" + server.Port + "\";export REPLICATION_MANAGER_CLUSTER_NAME=\"" + server.ClusterGroup.Name + "\"")
/* REPLICATION_MANAGER_USER
REPLICATION_MANAGER_PASSWORD
REPLICATION_MANAGER_URL
REPLICATION_MANAGER_CLUSTER_NAME
REPLICATION_MANAGER_HOST_NAME
REPLICATION_MANAGER_HOST_PORT */
buf := strings.NewReader(server.GetSshEnv())
/*
REPLICATION_MANAGER_USER
REPLICATION_MANAGER_PASSWORD
REPLICATION_MANAGER_URL
REPLICATION_MANAGER_CLUSTER_NAME
REPLICATION_MANAGER_HOST_NAME
REPLICATION_MANAGER_HOST_USER
REPLICATION_MANAGER_HOST_PASSWORD
REPLICATION_MANAGER_HOST_PORT
*/
var (
stdout bytes.Buffer
stderr bytes.Buffer
Expand All @@ -122,15 +122,6 @@ func (cluster *Cluster) OnPremiseSetEnv(client *sshclient.Client, server *Server
}
server.ClusterGroup.LogPrintf(LvlInfo, "OnPremise start database install secret env: %s", stdout.String())

/*out := stdout.String()
out, err := client.Cmd("export MYSQL_ROOT_PASSWORD=" + server.Pass).Cmd("export REPLICATION_MANAGER_URL=https://" + server.ClusterGroup.Conf.MonitorAddress + ":" + server.ClusterGroup.Conf.APIPort).Cmd("export REPLICATION_MANAGER_USER=" + adminuser).Cmd("export REPLICATION_MANAGER_PASSWORD=" + adminpassword).Cmd("export REPLICATION_MANAGER_HOST_NAME=" + server.Host).Cmd("export REPLICATION_MANAGER_HOST_PORT=" + server.Port).Cmd("export REPLICATION_MANAGER_CLUSTER_NAME=" + server.ClusterGroup.Name).SmartOutput()
if err != nil {
server.ClusterGroup.LogPrintf(LvlErr, "OnPremise start database : %s", err)
return err
server.ClusterGroup.LogPrintf(LvlInfo, "OnPremise start database install secret env: %s", string(out))
}*/

return nil
}

Expand All @@ -156,12 +147,7 @@ func (cluster *Cluster) OnPremiseStartDatabaseService(server *ServerMonitor) err
buf := new(bytes.Buffer)
buf.ReadFrom(filerc)

adminuser := "admin"
adminpassword := "repman"
if user, ok := server.ClusterGroup.APIUsers[adminuser]; ok {
adminpassword = user.Password
}
buf2 := strings.NewReader("export MYSQL_ROOT_PASSWORD=\"" + server.Pass + "\";export REPLICATION_MANAGER_URL=\"https://" + server.ClusterGroup.Conf.MonitorAddress + ":" + server.ClusterGroup.Conf.APIPort + "\";export REPLICATION_MANAGER_USER=\"" + adminuser + "\";export REPLICATION_MANAGER_PASSWORD=\"" + adminpassword + "\";export REPLICATION_MANAGER_HOST_NAME=\"" + server.Host + "\";export REPLICATION_MANAGER_HOST_PORT=\"" + server.Port + "\";export REPLICATION_MANAGER_CLUSTER_NAME=\"" + server.ClusterGroup.Name + "\"\n")
buf2 := strings.NewReader(server.GetSshEnv())
r := io.MultiReader(buf2, buf)

var (
Expand All @@ -174,12 +160,6 @@ func (cluster *Cluster) OnPremiseStartDatabaseService(server *ServerMonitor) err
out := stdout.String()

server.ClusterGroup.LogPrintf(LvlInfo, "OnPremise start script: %s ,out: %s ,err: %s", cmd, out, stderr.String())
/* out, err := client.Cmd(cmd).SmartOutput()
if err != nil {
server.ClusterGroup.LogPrintf(LvlErr, "OnPremise start database : %s", err)
return err
}

*/
return nil
}
20 changes: 20 additions & 0 deletions cluster/srv_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ func (server *ServerMonitor) GetProcessList() []dbhelper.Processlist {
return server.FullProcessList
}

func (server *ServerMonitor) GetSshEnv() string {
/*
REPLICATION_MANAGER_USER
REPLICATION_MANAGER_PASSWORD
REPLICATION_MANAGER_URL
REPLICATION_MANAGER_CLUSTER_NAME
REPLICATION_MANAGER_HOST_NAME
REPLICATION_MANAGER_HOST_USER
REPLICATION_MANAGER_HOST_PASSWORD
REPLICATION_MANAGER_HOST_PORT
*/
adminuser := "admin"
adminpassword := "repman"
if user, ok := server.ClusterGroup.APIUsers[adminuser]; ok {
adminpassword = user.Password
}
return "export REPLICATION_MANAGER_HOST_USER=\"" + server.User + "\";export REPLICATION_MANAGER_HOST_PASSWORD=\"" + server.Pass + "\";export MYSQL_ROOT_PASSWORD=\"" + server.Pass + "\";export REPLICATION_MANAGER_URL=\"https://" + server.ClusterGroup.Conf.MonitorAddress + ":" + server.ClusterGroup.Conf.APIPort + "\";export REPLICATION_MANAGER_USER=\"" + adminuser + "\";export REPLICATION_MANAGER_PASSWORD=\"" + adminpassword + "\";export REPLICATION_MANAGER_HOST_NAME=\"" + server.Host + "\";export REPLICATION_MANAGER_HOST_PORT=\"" + server.Port + "\";export REPLICATION_MANAGER_CLUSTER_NAME=\"" + server.ClusterGroup.Name + "\"\n"
}

func (server *ServerMonitor) GetUniversalGtidServerID() uint64 {

if server.IsMariaDB() {
Expand Down
52 changes: 16 additions & 36 deletions cluster/srv_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,11 @@ func (server *ServerMonitor) JobZFSSnapBack() (int64, error) {
func (server *ServerMonitor) JobReseedMyLoader() {

threads := strconv.Itoa(server.ClusterGroup.Conf.BackupLogicalLoadThreads)
dumpCmd := exec.Command(server.ClusterGroup.GetMyLoaderPath(), "--overwrite-tables", "--directory="+server.ClusterGroup.master.GetMasterBackupDirectory(), "--verbose=3", "--threads="+threads, "--host="+misc.Unbracket(server.Host), "--port="+server.Port, "--user="+server.ClusterGroup.dbUser, "--password="+server.ClusterGroup.dbPass)

myargs := strings.Split(strings.ReplaceAll(server.ClusterGroup.Conf.BackupMyLoaderOptions, " ", " "), " ")
myargs = append(myargs, "--directory="+server.ClusterGroup.master.GetMasterBackupDirectory(), "--threads="+threads, "--host="+misc.Unbracket(server.Host), "--port="+server.Port, "--user="+server.ClusterGroup.dbUser, "--password="+server.ClusterGroup.dbPass)
dumpCmd := exec.Command(server.ClusterGroup.GetMyLoaderPath(), myargs...)

server.ClusterGroup.LogPrintf(LvlInfo, "Command: %s", strings.Replace(dumpCmd.String(), server.ClusterGroup.dbPass, "XXXX", 1))

stdoutIn, _ := dumpCmd.StdoutPipe()
Expand Down Expand Up @@ -664,9 +668,7 @@ func (server *ServerMonitor) JobBackupLogical() error {
}

dumpargs := strings.Split(strings.ReplaceAll("--defaults-file="+file+" "+server.ClusterGroup.getDumpParameter()+" "+dumpslave+" "+usegtid+" "+events, " ", " "), " ")

dumpargs = append(dumpargs, "--apply-slave-statements", "--host="+misc.Unbracket(server.Host), "--port="+server.Port, "--user="+server.ClusterGroup.dbUser /*"--log-error="+server.GetMyBackupDirectory()+"dump_error.log"*/)

dumpCmd := exec.Command(server.ClusterGroup.GetMysqlDumpPath(), dumpargs...)

server.ClusterGroup.LogPrintf(LvlInfo, "Command: %s ", strings.Replace(dumpCmd.String(), server.ClusterGroup.dbPass, "XXXX", -1))
Expand Down Expand Up @@ -738,27 +740,11 @@ func (server *ServerMonitor) JobBackupLogical() error {
// --no-schemas --regex '^(?!(mysql))'

threads := strconv.Itoa(server.ClusterGroup.Conf.BackupLogicalDumpThreads)
dumpCmd := exec.Command(server.ClusterGroup.GetMyDumperPath(), "--outputdir="+server.GetMyBackupDirectory(), "--chunk-filesize=1000", "--compress", "--less-locking", "--verbose=3", "--triggers", "--routines", "--events", "--trx-consistency-only", "--kill-long-queries", "--threads="+threads, "--host="+misc.Unbracket(server.Host), "--port="+server.Port, "--user="+server.ClusterGroup.dbUser, "--password="+server.ClusterGroup.dbPass)
server.ClusterGroup.LogPrintf(LvlInfo, "%s", strings.Replace(dumpCmd.String(), server.ClusterGroup.dbPass, "XXXX", 1))
/* pr, pw := io.Pipe()
defer pw.Close()
// tell the command to write to our pipe
dumpCmd.Stdout = pw
dumpCmd.Stderr = pw
myargs := strings.Split(strings.ReplaceAll(server.ClusterGroup.Conf.BackupMyLoaderOptions, " ", " "), " ")
myargs = append(myargs, "--outputdir="+server.GetMyBackupDirectory(), "--threads="+threads, "--host="+misc.Unbracket(server.Host), "--port="+server.Port, "--user="+server.ClusterGroup.dbUser, "--password="+server.ClusterGroup.dbPass)
dumpCmd := exec.Command(server.ClusterGroup.GetMyDumperPath(), myargs...)

buf := new(bytes.Buffer)
go func() {
defer pr.Close()
// copy the data written to the PipeReader via the cmd to stdout
if _, err := io.Copy(buf, pr); err != nil {
server.ClusterGroup.LogPrintf(LvlErr, "MyDumper: %s", err)
}
server.ClusterGroup.LogPrintf(LvlInfo, "%s", buf.String())
}()
*/
server.ClusterGroup.LogPrintf(LvlInfo, "%s", strings.Replace(dumpCmd.String(), server.ClusterGroup.dbPass, "XXXX", 1))
stdoutIn, _ := dumpCmd.StdoutPipe()
stderrIn, _ := dumpCmd.StderrPipe()
dumpCmd.Start()
Expand Down Expand Up @@ -884,7 +870,11 @@ func (server *ServerMonitor) JobRunViaSSH() error {
stdout bytes.Buffer
stderr bytes.Buffer
)
filerc, err2 := os.Open(server.Datadir + "/init/init/dbjobs_new")
scriptpath := server.Datadir + "/init/init/dbjobs_new"
if server.GetCluster().GetConf().OnPremiseSSHDbJobScript == "" {
scriptpath = server.GetCluster().GetConf().OnPremiseSSHDbJobScript
}
filerc, err2 := os.Open(scriptpath)
if err2 != nil {
server.ClusterGroup.LogPrintf(LvlErr, "JobRunViaSSH %s", err2)
return errors.New("Cancel dbjob can't open script")
Expand All @@ -893,20 +883,10 @@ func (server *ServerMonitor) JobRunViaSSH() error {
defer filerc.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(filerc)
/*
adminuser := "admin"
adminpassword := "repman"
if user, ok := server.ClusterGroup.APIUsers[adminuser]; ok {
adminpassword = user.Password
}
_, err = client.Cmd("export MYSQL_ROOT_PASSWORD=" + server.Pass).Cmd("export REPLICATION_MANAGER_URL=" + server.ClusterGroup.Conf.MonitorAddress + ":" + server.ClusterGroup.Conf.APIPort).Cmd("export REPLICATION_MANAGER_USER=" + adminuser).Cmd("export REPLICATION_MANAGER_PASSWORD=" + adminpassword).Cmd("export REPLICATION_MANAGER_HOST_NAME=" + server.Host).Cmd("export REPLICATION_MANAGER_HOST_PORT=" + server.Port).Cmd("export REPLICATION_MANAGER_CLUSTER_NAME=" +
server.ClusterGroup.Name).SmartOutput()
if err != nil {
return errors.New("JobRunViaSSH Setup env variables via SSH %s" + err.Error())
}*/

buf2 := strings.NewReader("export MYSQL_ROOT_PASSWORD=\"" + server.Pass + "\"\n")
buf2 := strings.NewReader(server.GetSshEnv())
r := io.MultiReader(buf2, buf)

if client.Shell().SetStdio(r, &stdout, &stderr).Start(); err != nil {
server.ClusterGroup.LogPrintf(LvlErr, "Database jobs run via SSH: %s", stderr.String())
}
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ type Config struct {
OnPremiseSSHPrivateKey string `mapstructure:"onpremise-ssh-private-key" toml:"onpremise-ssh-private-key" json:"onpremiseSshPrivateKey"`
OnPremiseSSHStartDbScript string `mapstructure:"onpremise-ssh-start-db-script" toml:"onpremise-ssh-start-db-script" json:"onpremiseSshStartDbScript"`
OnPremiseSSHStartProxyScript string `mapstructure:"onpremise-ssh-start-proxy-script" toml:"onpremise-ssh-start-proxy-script" json:"onpremiseSshStartProxyScript"`
OnPremiseSSHDbJobScript string `mapstructure:"onpremise-ssh-db-job-script" toml:"onpremise-ssh-db-job-script" json:"onpremiseSshDbJobScript"`
ProvOpensvcP12Certificate string `mapstructure:"opensvc-p12-certificate" toml:"opensvc-p12-certificat" json:"opensvcP12Certificate"`
ProvOpensvcP12Secret string `mapstructure:"opensvc-p12-secret" toml:"opensvc-p12-secret" json:"opensvcP12Secret"`
ProvOpensvcUseCollectorAPI bool `mapstructure:"opensvc-use-collector-api" toml:"opensvc-use-collector-api" json:"opensvcUseCollectorApi"`
Expand Down Expand Up @@ -498,6 +499,8 @@ type Config struct {
BackupMysqldumpOptions string `mapstructure:"backup-mysqldump-options" toml:"backup-mysqldump-options" json:"backupMysqldumpOptions"`
BackupMyDumperPath string `mapstructure:"backup-mydumper-path" toml:"backup-mydumper-path" json:"backupMydumperPath"`
BackupMyLoaderPath string `mapstructure:"backup-myloader-path" toml:"backup-myloader-path" json:"backupMyloaderPath"`
BackupMyLoaderOptions string `mapstructure:"backup-myloader-options" toml:"backup-myloader-options" json:"backupMyloaderOptions"`
BackupMyDumperOptions string `mapstructure:"backup-mydumper-options" toml:"backup-mydumper-options" json:"backupMyDumperOptions"`
BackupMysqlbinlogPath string `mapstructure:"backup-mysqlbinlog-path" toml:"backup-mysqlbinlog-path" json:"backupMysqlbinlogPath"`
BackupMysqlclientPath string `mapstructure:"backup-mysqlclient-path" toml:"backup-mysqlclient-path" json:"backupMysqlclientgPath"`
BackupBinlogs bool `mapstructure:"backup-binlogs" toml:"backup-binlogs" json:"backupBinlogs"`
Expand Down
4 changes: 4 additions & 0 deletions server/server_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ func init() {

monitorCmd.Flags().StringVar(&conf.BackupMyDumperPath, "backup-mydumper-path", "/usr/bin/mydumper", "Path to mydumper binary")
monitorCmd.Flags().StringVar(&conf.BackupMyLoaderPath, "backup-myloader-path", "/usr/bin/myloader", "Path to myloader binary")
monitorCmd.Flags().StringVar(&conf.BackupMyLoaderOptions, "backup-myloader-options", "--overwrite-tables --enable-binlog --verbose=3", "Extra options")
monitorCmd.Flags().StringVar(&conf.BackupMyDumperOptions, "backup-mydumper-options", "--chunk-filesize=1000 --compress --less-locking --verbose=3 --triggers --routines --events --trx-consistency-only --kill-long-queries", "Extra options")
monitorCmd.Flags().StringVar(&conf.BackupMysqldumpPath, "backup-mysqldump-path", "", "Path to mysqldump binary")
monitorCmd.Flags().StringVar(&conf.BackupMysqldumpOptions, "backup-mysqldump-options", "--hex-blob --single-transaction --verbose --all-databases --routines=true --triggers=true --system=all", "Extra options")
monitorCmd.Flags().StringVar(&conf.BackupMysqlbinlogPath, "backup-mysqlbinlog-path", "", "Path to mysqlbinlog binary")
Expand Down Expand Up @@ -434,6 +436,8 @@ func init() {
monitorCmd.Flags().StringVar(&conf.OnPremiseSSHCredential, "onpremise-ssh-credential", "root:", "User:password for ssh if no password using current user private key")
monitorCmd.Flags().StringVar(&conf.OnPremiseSSHStartDbScript, "onpremise-ssh-start-db-script", "", "Run via ssh a custom script to start database")
monitorCmd.Flags().StringVar(&conf.OnPremiseSSHStartProxyScript, "onpremise-ssh-start-proxy-script", "", "Run via ssh a custom script to start proxy")
monitorCmd.Flags().StringVar(&conf.OnPremiseSSHDbJobScript, "onpremise-ssh-db-job-script", "", "Run via ssh a custom script to execute database jobs")

if WithProvisioning == "ON" {
monitorCmd.Flags().StringVar(&conf.ProvDatadirVersion, "prov-db-datadir-version", "10.2", "Empty datadir to deploy for localtest")
monitorCmd.Flags().StringVar(&conf.ProvDiskSystemSize, "prov-db-disk-system-size", "2", "Disk in g for micro service VM")
Expand Down
Loading

0 comments on commit b893fc4

Please sign in to comment.