Skip to content

Commit

Permalink
resolved the merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
emmaloubersac committed Feb 27, 2023
2 parents f58e5bf + 205cefd commit f71aea6
Show file tree
Hide file tree
Showing 27 changed files with 468 additions and 150 deletions.
1 change: 1 addition & 0 deletions arbitrator/arbitrator_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func init() {
rootCmd.Flags().StringVar(&conf.KeyPath, "keypath", "/etc/replication-manager/.replication-manager.key", "Encryption key file path")
rootCmd.PersistentFlags().BoolVar(&conf.Verbose, "verbose", false, "Print detailed execution info")
rootCmd.PersistentFlags().StringVar(&memprofile, "memprofile", "/tmp/repmgr.mprof", "Write a memory profile to a file readable by pprof")
rootCmd.PersistentFlags().StringVar(&conf.WorkingDir, "monitoring-datadir", "/var/lib/replication-manager", "Path to write temporary and persistent files")

}

Expand Down
50 changes: 26 additions & 24 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import (
"github.com/signal18/replication-manager/utils/s18log"
"github.com/signal18/replication-manager/utils/state"
log "github.com/sirupsen/logrus"
logsqlerr "github.com/sirupsen/logrus"
logsqlgen "github.com/sirupsen/logrus"
logsql "github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)

Expand Down Expand Up @@ -174,6 +173,8 @@ type Cluster struct {
inInitNodes bool `json:"-"`
CanInitNodes bool `json:"canInitNodes"`
errorInitNodes error `json:"-"`
SqlErrorLog *logsql.Logger `json:"-"`
SqlGeneralLog *logsql.Logger `json:"-"`
sync.Mutex
crcTable *crc64.Table
}
Expand Down Expand Up @@ -254,7 +255,8 @@ const (

// Init initial cluster definition
func (cluster *Cluster) Init(conf config.Config, cfgGroup string, tlog *s18log.TermLog, log *s18log.HttpLog, termlength int, runUUID string, repmgrVersion string, repmgrHostname string, key []byte) error {

cluster.SqlErrorLog = logsql.New()
cluster.SqlGeneralLog = logsql.New()
cluster.crcTable = crc64.MakeTable(crc64.ECMA) // http://golang.org/pkg/hash/crc64/#pkg-constants
cluster.switchoverChan = make(chan bool)
// should use buffered channels or it will block
Expand Down Expand Up @@ -313,34 +315,34 @@ func (cluster *Cluster) Init(conf config.Config, cfgGroup string, tlog *s18log.T
MaxSize: cluster.Conf.LogRotateMaxSize,
MaxBackups: cluster.Conf.LogRotateMaxBackup,
MaxAge: cluster.Conf.LogRotateMaxAge,
Level: logsqlerr.DebugLevel,
Formatter: &logsqlerr.TextFormatter{
Level: logsql.DebugLevel,
Formatter: &logsql.TextFormatter{
DisableColors: true,
TimestampFormat: "2006-01-02 15:04:05",
FullTimestamp: true,
},
})
if err != nil {
logsqlerr.WithError(err).Error("Can't init error sql log file")
cluster.SqlErrorLog.WithError(err).Error("Can't init error sql log file")
}
logsqlerr.AddHook(hookerr)
cluster.SqlErrorLog.AddHook(hookerr)

hookgen, err := s18log.NewRotateFileHook(s18log.RotateFileConfig{
Filename: cluster.WorkingDir + "/sql_general.log",
MaxSize: cluster.Conf.LogRotateMaxSize,
MaxBackups: cluster.Conf.LogRotateMaxBackup,
MaxAge: cluster.Conf.LogRotateMaxAge,
Level: logsqlerr.DebugLevel,
Formatter: &logsqlgen.TextFormatter{
Level: logsql.DebugLevel,
Formatter: &logsql.TextFormatter{
DisableColors: true,
TimestampFormat: "2006-01-02 15:04:05",
FullTimestamp: true,
},
})
if err != nil {
logsqlgen.WithError(err).Error("Can't init general sql log file")
cluster.SqlGeneralLog.WithError(err).Error("Can't init general sql log file")
}
logsqlgen.AddHook(hookgen)
cluster.SqlGeneralLog.AddHook(hookgen)
cluster.LoadAPIUsers()
// createKeys do nothing yet
cluster.createKeys()
Expand Down Expand Up @@ -435,8 +437,8 @@ func (cluster *Cluster) Run() {
for k, v := range cluster.Servers {
cluster.LogPrintf(LvlDbg, "Server [%d]: URL: %-15s State: %6s PrevState: %6s", k, v.URL, v.State, v.PrevState)
}
if cluster.master != nil {
cluster.LogPrintf(LvlDbg, "Master [ ]: URL: %-15s State: %6s PrevState: %6s", cluster.master.URL, cluster.master.State, cluster.master.PrevState)
if cluster.GetMaster() != nil {
cluster.LogPrintf(LvlDbg, "Master [ ]: URL: %-15s State: %6s PrevState: %6s", cluster.master.URL, cluster.GetMaster().State, cluster.GetMaster().PrevState)
for k, v := range cluster.slaves {
cluster.LogPrintf(LvlDbg, "Slave [%d]: URL: %-15s State: %6s PrevState: %6s", k, v.URL, v.State, v.PrevState)
}
Expand Down Expand Up @@ -736,7 +738,7 @@ func (cluster *Cluster) FailoverForce() error {

}
}
if cluster.master == nil {
if cluster.GetMaster() == nil {
cluster.LogPrintf(LvlErr, "Could not find a failed server in the hosts list")
return errors.New("ERROR: Could not find a failed server in the hosts list")
}
Expand Down Expand Up @@ -882,21 +884,21 @@ func (cluster *Cluster) MonitorSchema() {
if !cluster.Conf.MonitorSchemaChange {
return
}
if cluster.master == nil {
if cluster.GetMaster() == nil {
return
}
if cluster.master.State == stateFailed || cluster.master.State == stateMaintenance || cluster.master.State == stateUnconn {
if cluster.GetMaster().State == stateFailed || cluster.GetMaster().State == stateMaintenance || cluster.GetMaster().State == stateUnconn {
return
}
if cluster.master.Conn == nil {
if cluster.GetMaster().Conn == nil {
return
}
cluster.sme.SetMonitorSchemaState()
cluster.master.Conn.SetConnMaxLifetime(3595 * time.Second)
cluster.GetMaster().Conn.SetConnMaxLifetime(3595 * time.Second)

tables, tablelist, logs, err := dbhelper.GetTables(cluster.master.Conn, cluster.master.DBVersion)
cluster.LogSQL(logs, err, cluster.master.URL, "Monitor", LvlErr, "Could not fetch master tables %s", err)
cluster.master.Tables = tablelist
tables, tablelist, logs, err := dbhelper.GetTables(cluster.GetMaster().Conn, cluster.GetMaster().DBVersion)
cluster.LogSQL(logs, err, cluster.GetMaster().URL, "Monitor", LvlErr, "Could not fetch master tables %s", err)
cluster.GetMaster().Tables = tablelist

var tableCluster []string
var duplicates []*ServerMonitor
Expand All @@ -910,7 +912,7 @@ func (cluster *Cluster) MonitorSchema() {

duplicates = append(duplicates, cluster.GetMaster())
tableCluster = append(tableCluster, cluster.GetName())
oldtable, err := cluster.master.GetTableFromDict(t.TableSchema + "." + t.TableName)
oldtable, err := cluster.GetMaster().GetTableFromDict(t.TableSchema + "." + t.TableName)
haschanged := false
if err != nil {
if err.Error() == "Empty" {
Expand Down Expand Up @@ -957,7 +959,7 @@ func (cluster *Cluster) MonitorSchema() {
}
cluster.DBIndexSize = totindexsize
cluster.DBTableSize = tottablesize
cluster.master.DictTables = tables
cluster.GetMaster().DictTables = tables
cluster.sme.RemoveMonitorSchemaState()
}

Expand Down Expand Up @@ -1014,7 +1016,7 @@ func (cluster *Cluster) LostArbitration(realmasterurl string) {
}
if cluster.Conf.ArbitrationFailedMasterScript != "" {
cluster.LogPrintf(LvlInfo, "Calling abitration failed for master script")
out, err := exec.Command(cluster.Conf.ArbitrationFailedMasterScript, cluster.master.Host, cluster.master.Port).CombinedOutput()
out, err := exec.Command(cluster.Conf.ArbitrationFailedMasterScript, cluster.GetMaster().Host, cluster.GetMaster().Port).CombinedOutput()
if err != nil {
cluster.LogPrintf(LvlErr, "%s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_chk.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (cluster *Cluster) isMasterFailed() bool {
func (cluster *Cluster) isMaxMasterFailedCountReached() bool {
// no illimited failed count

if cluster.master.FailCount >= cluster.Conf.MaxFail {
if cluster.GetMaster().FailCount >= cluster.Conf.MaxFail {
cluster.sme.AddState("WARN0023", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0023"]), ErrFrom: "CHECK"})
return true
} else {
Expand Down
5 changes: 3 additions & 2 deletions cluster/cluster_fail.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,7 @@ func (cluster *Cluster) VMasterFailover(fail bool) bool {
cluster.failoverPreScript(fail)

// Phase 2: Reject updates and sync slaves on switchover
if fail == false {
if fail == false && cluster.GetTopology() != topoMultiMasterWsrep {
cluster.LogPrintf(LvlInfo, "Rejecting updates on %s (old master)", cluster.oldMaster.URL)
cluster.oldMaster.freeze()
}
Expand Down Expand Up @@ -1323,7 +1323,8 @@ func (cluster *Cluster) VMasterFailover(fail bool) bool {
cluster.LogPrintf(LvlErr, "Could not set old master as read-write, %s", err)
}
}
if cluster.Conf.SwitchDecreaseMaxConn {
// Galara does not freeze old master because of bug https://jira.mariadb.org/browse/MDEV-9134
if cluster.Conf.SwitchDecreaseMaxConn && cluster.GetTopology() != topoMultiMasterWsrep {
logs, err := dbhelper.SetMaxConnections(cluster.oldMaster.Conn, cluster.oldMaster.maxConn, cluster.oldMaster.DBVersion)
cluster.LogSQL(logs, err, cluster.oldMaster.URL, "MasterFailover", LvlErr, "Could not set max connections on %s %s", cluster.oldMaster.URL, err)
}
Expand Down
10 changes: 9 additions & 1 deletion cluster/cluster_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@ func (cluster *Cluster) GetGComm() string {
} else {
gcomms = append(gcomms, server.Host+":"+strconv.Itoa(cluster.Conf.MultiMasterGrouprepPort))
}

}
// For bootrap galera cluster on first node
if cluster.AllServersFailed() && cluster.GetTopology() == topoMultiMasterWsrep {
return ""
}
if cluster.GetTopology() == topoMultiMasterWsrep {
return strings.Join(gcomms, ",") + "?pc.wait_prim=yes"
}
return strings.Join(gcomms, ",")
}
Expand All @@ -308,7 +316,7 @@ func (cluster *Cluster) getOnePreferedMaster() *ServerMonitor {
if cluster.Conf.LogLevel > 2 {
cluster.LogPrintf(LvlDbg, "Lookup if server: %s is preferred master: %s", server.URL, cluster.Conf.PrefMaster)
}
if strings.Contains(cluster.Conf.PrefMaster, server.URL) {
if server.IsPrefered() {
return server
}
}
Expand Down
16 changes: 7 additions & 9 deletions cluster/cluster_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/nsf/termbox-go"
"github.com/signal18/replication-manager/utils/s18log"
log "github.com/sirupsen/logrus"
logsqlerr "github.com/sirupsen/logrus"
logsqlgen "github.com/sirupsen/logrus"
)

// Log levels
Expand Down Expand Up @@ -52,12 +50,12 @@ func (cluster *Cluster) LogSQL(logs string, err error, url string, from string,
}
if logs != "" {
if err != nil {
cluster.LogSQLErrorPrintf(LvlInfo, url, err, from, logs, fmt.Sprintf(format, args...))
cluster.LogSqlErrorPrintf(LvlInfo, url, err, from, logs, fmt.Sprintf(format, args...))
}
if from != "Monitor" {
cluster.LogSQLGeneralPrintf(LvlInfo, url, from, logs)
cluster.LogSqlGeneralPrintf(LvlInfo, url, from, logs)
} else if cluster.Conf.LogSQLInMonitoring {
cluster.LogSQLGeneralPrintf(LvlInfo, url, from, logs)
cluster.LogSqlGeneralPrintf(LvlInfo, url, from, logs)
}
}
}
Expand Down Expand Up @@ -92,7 +90,7 @@ func (cluster *Cluster) LogPrint(msg ...interface{}) {
}
}

func (cluster *Cluster) LogSQLGeneralPrintf(level string, url string, from string, format string) {
func (cluster *Cluster) LogSqlGeneralPrintf(level string, url string, from string, format string) {
stamp := fmt.Sprint(time.Now().Format("2006/01/02 15:04:05"))
msg := s18log.HttpMessage{
Group: cluster.Name,
Expand All @@ -101,10 +99,10 @@ func (cluster *Cluster) LogSQLGeneralPrintf(level string, url string, from strin
Text: format,
}
cluster.SQLGeneralLog.Add(msg)
logsqlgen.WithFields(log.Fields{"cluster": cluster.Name, "server": url, "module": from}).Infof(format)
cluster.SqlGeneralLog.WithFields(log.Fields{"cluster": cluster.Name, "server": url, "module": from}).Infof(format)
}

func (cluster *Cluster) LogSQLErrorPrintf(level string, url string, err error, from string, logs string, format string) {
func (cluster *Cluster) LogSqlErrorPrintf(level string, url string, err error, from string, logs string, format string) {
stamp := fmt.Sprint(time.Now().Format("2006/01/02 15:04:05"))
msg := s18log.HttpMessage{
Group: cluster.Name,
Expand All @@ -113,7 +111,7 @@ func (cluster *Cluster) LogSQLErrorPrintf(level string, url string, err error, f
Text: logs,
}
cluster.SQLErrorLog.Add(msg)
logsqlerr.WithFields(log.Fields{"cluster": cluster.Name, "server": url, "module": from, "error": err, "sql": logs}).Errorf(format)
cluster.SqlErrorLog.WithFields(log.Fields{"cluster": cluster.Name, "server": url, "module": from, "error": err, "sql": logs}).Errorf(format)
}

func (cluster *Cluster) LogUpdate(line int, level string, format string, args ...interface{}) int {
Expand Down
12 changes: 11 additions & 1 deletion cluster/cluster_topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (cluster *Cluster) TopologyDiscover(wcg *sync.WaitGroup) error {
sro++
}
}
if sro > 1 && cluster.GetTopology() != topoMultiMasterGrouprep {
if sro > 1 && cluster.GetTopology() != topoMultiMasterGrouprep && cluster.GetTopology() != topoMultiMasterWsrep {
cluster.SetState("WARN0004", state.State{ErrType: "WARNING", ErrDesc: "RO server count > 1 in 2 node multi-master mode. switching to preferred master.", ErrFrom: "TOPO"})
server := cluster.getOnePreferedMaster()
if server != nil {
Expand All @@ -277,6 +277,16 @@ func (cluster *Cluster) TopologyDiscover(wcg *sync.WaitGroup) error {
cluster.SetState("WARN0006", state.State{ErrType: "WARNING", ErrDesc: "Multi-master need a preferred master.", ErrFrom: "TOPO"})
}
}
if sro == len(cluster.Servers) && cluster.GetTopology() == topoMultiMasterWsrep {
if cluster.GetMaster() == nil {
cluster.SetState("WARN0006", state.State{ErrType: "WARNING", ErrDesc: "Wsrep cluster need a leader electing one", ErrFrom: "TOPO"})
server := cluster.getOnePreferedMaster()
if server != nil {
server.ClusterGroup.vmaster = server
server.SetReadWrite()
}
} // no master
} // end RO servers = number of nodes and galera
}

if cluster.slaves != nil && !cluster.Conf.MultiMasterGrouprep {
Expand Down
2 changes: 1 addition & 1 deletion cluster/configurator/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (configurator *Configurator) GenerateDatabaseConfig(Datadir string, Cluster
if configurator.IsFilterInDBTags("docker") && configurator.ClusterConfig.ProvOrchestrator != config.ConstOrchestratorLocalhost {
if configurator.IsFilterInDBTags("wsrep") {
//if galera don't cusomized system files
if strings.Contains(content, "./.system") {
if strings.Contains(content, "./.system") && !(strings.Contains(content, "exclude") || strings.Contains(content, "ignore")) {
content = ""
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions cluster/prov_opensvc_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ func (cluster *Cluster) OpenSVCGetNetSection() map[string]string {
svcnet["type"] = "cni"
svcnet["netns"] = "container#01"
svcnet["network"] = cluster.Conf.ProvNetCNICluster
if cluster.GetTopology() == topoMultiMasterWsrep {
svcnet["wait_dns"] = "15s"
}
return svcnet
} else if cluster.Conf.ProvType == "docker" {
svcnet["type"] = "docker"
Expand Down
6 changes: 3 additions & 3 deletions cluster/prx_haproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,11 @@ func (proxy *HaproxyProxy) Refresh() error {
PrxByteOut: line[9],
PrxLatency: line[61],
})
if (srv.State == stateSlaveErr || srv.State == stateRelayErr || srv.State == stateSlaveLate || srv.State == stateRelayLate || srv.IsIgnored()) && line[17] == "UP" {
if (srv.State == stateSlaveErr || srv.State == stateRelayErr || srv.State == stateSlaveLate || srv.State == stateRelayLate || srv.IsIgnored()) && line[17] == "UP" || srv.State == stateWsrepLate || srv.State == stateWsrepDonor {
cluster.LogPrintf(LvlInfo, "HaProxy detecting broken resplication and UP state in haproxy %s drain server %s", proxy.Host+":"+proxy.Port, srv.URL)
haRuntime.SetDrain(srv.Id, cluster.Conf.HaproxyAPIReadBackend)
}
if (srv.State == stateSlave || srv.State == stateRelay) && line[17] == "DRAIN" && !srv.IsIgnored() {
if (srv.State == stateSlave || srv.State == stateRelay || (srv.State == stateWsrep && !srv.IsLeader())) && line[17] == "DRAIN" && !srv.IsIgnored() {
cluster.LogPrintf(LvlInfo, "HaProxy valid resplication and DRAIN state in haproxy %s enable traffic on server %s", proxy.Host+":"+proxy.Port, srv.URL)
haRuntime.SetReady(srv.Id, cluster.Conf.HaproxyAPIReadBackend)
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func (proxy *HaproxyProxy) Refresh() error {
}
if !foundMasterInStat {
master := cluster.GetMaster()
if master != nil && master.State == stateMaster {
if master != nil && master.IsLeader() {
res, err := haRuntime.SetMaster(master.Host, master.Port)
cluster.LogPrintf(LvlInfo, "Haproxy have leader in cluster but not in haproxy %s fixing it to master %s return %s", proxy.Host+":"+proxy.Port, master.URL, res)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions cluster/prx_mariadbshardproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,13 @@ func (proxy *MariadbShardProxy) Refresh() error {

return errors.New("Sharding proxy refresh no database monitor yet initialize")
}
wg := new(sync.WaitGroup)
wg.Add(1)
go proxy.ShardProxy.Ping(wg)
wg.Wait()
err := proxy.ShardProxy.Refresh()
if err != nil {
proxy.ClusterGroup.LogPrintf(LvlErr, "Sharding proxy refresh error (%s)", err)

//proxy.ClusterGroup.LogPrintf(LvlErr, "Sharding proxy refresh error (%s)", err)
return err
}
proxy.Version = proxy.ShardProxy.Variables["VERSION"]
Expand Down
Loading

0 comments on commit f71aea6

Please sign in to comment.