Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emma23022023 #477

Merged
merged 5 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ type Cluster struct {
WaitingFailover int `json:"waitingFailover"`
Configurator configurator.Configurator `json:"configurator"`
DiffVariables []VariableDiff `json:"diffVariables"`
inInitNodes bool `json:"-"`
CanInitNodes bool `json:"canInitNodes"`
errorInitNodes error `json:"-"`
SqlErrorLog *logsql.Logger `json:"-"`
SqlGeneralLog *logsql.Logger `json:"-"`

sync.Mutex
crcTable *crc64.Table
}
Expand Down Expand Up @@ -266,6 +268,7 @@ func (cluster *Cluster) Init(conf config.Config, cfgGroup string, tlog *s18log.T
cluster.addtableCond = nbc.New()
cluster.altertableCond = nbc.New()
cluster.canFlashBack = true
cluster.CanInitNodes = true
cluster.runOnceAfterTopology = true
cluster.testStopCluster = true
cluster.testStartCluster = true
Expand Down Expand Up @@ -362,18 +365,25 @@ func (cluster *Cluster) Init(conf config.Config, cfgGroup string, tlog *s18log.T

return nil
}

func (cluster *Cluster) initOrchetratorNodes() {
if cluster.inInitNodes {
return
}
cluster.inInitNodes = true
defer func() { cluster.inInitNodes = false }()

//defer cluster.insideInitNodes = false
//cluster.LogPrintf(LvlInfo, "Loading nodes from orchestrator %s", cluster.Conf.ProvOrchestrator)
switch cluster.GetOrchestrator() {
case config.ConstOrchestratorOpenSVC:
cluster.Agents, _ = cluster.OpenSVCGetNodes()
cluster.Agents, cluster.errorInitNodes = cluster.OpenSVCGetNodes()
case config.ConstOrchestratorKubernetes:
cluster.Agents, _ = cluster.K8SGetNodes()
cluster.Agents, cluster.errorInitNodes = cluster.K8SGetNodes()
case config.ConstOrchestratorSlapOS:
cluster.Agents, _ = cluster.SlapOSGetNodes()
cluster.Agents, cluster.errorInitNodes = cluster.SlapOSGetNodes()
case config.ConstOrchestratorLocalhost:
cluster.Agents, _ = cluster.LocalhostGetNodes()
cluster.Agents, cluster.errorInitNodes = cluster.LocalhostGetNodes()
case config.ConstOrchestratorOnPremise:
default:
log.Fatalln("prov-orchestrator not supported", cluster.Conf.ProvOrchestrator)
Expand Down Expand Up @@ -447,7 +457,7 @@ func (cluster *Cluster) Run() {
if !cluster.IsInFailover() {
cluster.initProxies()
}
cluster.initOrchetratorNodes()
go cluster.initOrchetratorNodes()
cluster.ResticFetchRepo()
cluster.runOnceAfterTopology = false
} else {
Expand All @@ -464,7 +474,7 @@ func (cluster *Cluster) Run() {
cluster.InjectProxiesTraffic()
}
if cluster.sme.GetHeartbeats()%30 == 0 {
cluster.initOrchetratorNodes()
go cluster.initOrchetratorNodes()
cluster.MonitorQueryRules()
cluster.MonitorVariablesDiff()
cluster.ResticFetchRepo()
Expand All @@ -474,9 +484,12 @@ func (cluster *Cluster) Run() {
cluster.sme.PreserveState("WARN0093")
cluster.sme.PreserveState("WARN0084")
cluster.sme.PreserveState("WARN0095")
cluster.sme.PreserveState("ERR00082")
cluster.sme.PreserveState("WARN0101")
}
if !cluster.CanInitNodes {
cluster.SetState("ERR00082", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["ERR00082"], cluster.errorInitNodes), ErrFrom: "OPENSVC"})
}

if cluster.sme.GetHeartbeats()%36000 == 0 {
cluster.ResticPurgeRepo()
} else {
Expand Down Expand Up @@ -564,8 +577,13 @@ func (cluster *Cluster) StateProcessing() {
}
// cluster.statecloseChan <- s
}
var states []string
if cluster.runOnceAfterTopology {
states = cluster.sme.GetFirstStates()

states := cluster.sme.GetStates()
} else {
states = cluster.sme.GetStates()
}
for i := range states {
cluster.LogPrintf("STATE", states[i])
}
Expand All @@ -576,7 +594,9 @@ func (cluster *Cluster) StateProcessing() {
}

for _, s := range cluster.sme.GetLastOpenedStates() {

cluster.CheckAlert(s)

}

cluster.sme.ClearState()
Expand Down
4 changes: 3 additions & 1 deletion cluster/prov_opensvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ func (cluster *Cluster) OpenSVCGetNodes() ([]Agent, error) {
svc := cluster.OpenSVCConnect()
hosts, err := svc.GetNodes()
if err != nil {
cluster.SetState("ERR00082", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["ERR00082"], err), ErrFrom: "OPENSVC"})
cluster.CanInitNodes = false
return nil, err
} else {
cluster.CanInitNodes = true
}
if hosts == nil {
return nil, errors.New("Empty Opensvc Agent list")
Expand Down
2 changes: 1 addition & 1 deletion cluster/srv_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func (server *ServerMonitor) GetDatabaseBasedir() string {
} else if server.ClusterGroup.Conf.ProvOrchestrator == config.ConstOrchestratorSlapOS {
return server.SlapOSDatadir
}
return ""
return server.Datadir
}

func (server *ServerMonitor) GetTablePK(schema string, table string) (string, error) {
Expand Down
3 changes: 1 addition & 2 deletions cluster/srv_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/signal18/replication-manager/utils/dbhelper"
"github.com/signal18/replication-manager/utils/misc"
"github.com/signal18/replication-manager/utils/state"
)

func (server *ServerMonitor) SetPlacement(k int, ProvAgents string, SlapOSDBPartitions string, SchedulerReceiverPorts string) {
Expand Down Expand Up @@ -230,7 +229,7 @@ func (server *ServerMonitor) SetCredential(url string, user string, pass string)
server.Host, server.Port, server.PostgressDB = misc.SplitHostPortDB(url)
server.IP, err = dbhelper.CheckHostAddr(server.Host)
if err != nil {
server.ClusterGroup.SetState("ERR00062", state.State{ErrType: LvlWarn, ErrDesc: fmt.Sprintf(clusterError["ERR00062"], server.Host, err.Error()), ErrFrom: "TOPO"})
server.GetCluster().LogPrintf(LvlErr, "Cannot resolved DNS for host %s, error: %s", server.Host, err.Error())
}
if server.PostgressDB == "" {
server.PostgressDB = "test"
Expand Down
2 changes: 1 addition & 1 deletion cluster/srv_snd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (server *ServerMonitor) GetDatabaseMetrics() []graphite.Metric {
replacer := strings.NewReplacer("`", "", "?", "", " ", "_", ".", "-", "(", "-", ")", "-", "/", "_", "<", "-", "'", "-", "\"", "-")
hostname := replacer.Replace(server.Variables["HOSTNAME"])
var metrics []graphite.Metric
if server.IsSlave {
if server.IsSlave && server.GetCluster().GetTopology() != topoMultiMasterWsrep && server.GetCluster().GetTopology() != topoMultiMasterGrouprep {
m := graphite.NewMetric(fmt.Sprintf("mysql.%s.mysql_slave_status_seconds_behind_master", hostname), fmt.Sprintf("%d", server.SlaveStatus.SecondsBehindMaster.Int64), time.Now().Unix())
metrics = append(metrics, m)
metrics = append(metrics, graphite.NewMetric(fmt.Sprintf("mysql.%s.mysql_slave_status_exec_master_log_pos", hostname), fmt.Sprintf("%s", server.SlaveStatus.ExecMasterLogPos.String), time.Now().Unix()))
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ type Config struct {
ProvOpensvcP12Certificate string `mapstructure:"opensvc-p12-certificate" toml:"opensvc-p12-certificat" json:"opensvcP12Certificate"`
ProvOpensvcP12Secret string `mapstructure:"opensvc-p12-secret" toml:"opensvc-p12-secret" json:"opensvcP12Secret"`
ProvOpensvcUseCollectorAPI bool `mapstructure:"opensvc-use-collector-api" toml:"opensvc-use-collector-api" json:"opensvcUseCollectorApi"`
ProvOpensvcCollectorAccount string `mapstructure:"opensvc-collector-account" toml:"opensvc-collector-account" json:"opensvcCollectorAccount"`
ProvRegister bool `mapstructure:"opensvc-register" toml:"opensvc-register" json:"opensvcRegister"`
ProvAdminUser string `mapstructure:"opensvc-admin-user" toml:"opensvc-admin-user" json:"opensvcAdminUser"`
ProvUser string `mapstructure:"opensvc-user" toml:"opensvc-user" json:"opensvcUser"`
Expand Down Expand Up @@ -529,7 +530,7 @@ type ConfigVariableType struct {
Label string `json:"label"`
}

//Compliance created in OpenSVC collector and exported as JSON
// Compliance created in OpenSVC collector and exported as JSON
type Compliance struct {
Filtersets []struct {
ID uint `json:"id"`
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ const (
ConstMonitorStandby string = "S"
)

//Unused in server still used in client cmd line
// Unused in server still used in client cmd line
type Settings struct {
Enterprise string `json:"enterprise"`
Interactive string `json:"interactive"`
Expand Down Expand Up @@ -572,7 +572,7 @@ func (repman *ReplicationManager) Run() error {
repman.OpenSVC.CertsDERSecret = repman.Conf.ProvOpensvcP12Secret
err := repman.OpenSVC.LoadCert(repman.Conf.ProvOpensvcP12Certificate)
if err != nil {
log.Printf("Cannot load OpenSVC cluster certificate %s ", err)
log.Fatalf("Cannot load OpenSVC cluster certificate %s ", err)
}
}
//don't Bootstrap opensvc to speedup test
Expand Down
28 changes: 12 additions & 16 deletions server/server_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,26 +519,22 @@ func init() {
monitorCmd.Flags().StringVar(&conf.ProvOpensvcP12Certificate, "opensvc-p12-certificate", "/etc/replication-manager/s18.p12", "Certicate used for socket vs collector API opensvc-host refer to a cluster VIP")
monitorCmd.Flags().BoolVar(&conf.ProvOpensvcUseCollectorAPI, "opensvc-use-collector-api", false, "Use the collector API instead of cluster VIP")
monitorCmd.Flags().StringVar(&conf.KubeConfig, "kube-config", "", "path to ks8 config file")
monitorCmd.Flags().StringVar(&conf.ProvOpensvcCollectorAccount, "opensvc-collector-account", "/etc/replication-manager/account.yaml", "Openscv collector account")

dbConfig := viper.New()
dbConfig.SetConfigType("yaml")
file, err := ioutil.ReadFile(conf.ConfDir + "/account.yaml")
if err != nil {
file, err = ioutil.ReadFile(conf.ShareDir + "/opensvc/account.yaml")
if conf.ProvOpensvcUseCollectorAPI {
dbConfig := viper.New()
dbConfig.SetConfigType("yaml")
file, err := ioutil.ReadFile(conf.ProvOpensvcCollectorAccount)
if err != nil {
log.Errorf("%s", err)
log.Errorf("Provide OpenSVC account file : %s", err)

}

dbConfig.ReadConfig(bytes.NewBuffer(file))
conf.ProvUser = dbConfig.Get("email").(string) + ":" + dbConfig.Get("hashed_password").(string)
crcTable := crc64.MakeTable(crc64.ECMA)
conf.ProvCodeApp = "ns" + strconv.FormatUint(crc64.Checksum([]byte(dbConfig.Get("email").(string)), crcTable), 10)
}
dbConfig.ReadConfig(bytes.NewBuffer(file))
// log.Printf("OpenSVC user account: %s", dbConfig.Get("email").(string))
conf.ProvUser = dbConfig.Get("email").(string) + ":" + dbConfig.Get("hashed_password").(string)
crcTable := crc64.MakeTable(crc64.ECMA)
conf.ProvCodeApp = "ns" + strconv.FormatUint(crc64.Checksum([]byte(dbConfig.Get("email").(string)), crcTable), 10)
// log.Printf("OpenSVC code application: %s", conf.ProvCodeApp)
// } else {
// monitorCmd.Flags().StringVar(&conf.ProvUser, "opensvc-user", "replication-manager@localhost.localdomain:mariadb", "OpenSVC collector provisioning user")
// monitorCmd.Flags().StringVar(&conf.ProvCodeApp, "opensvc-codeapp", "MariaDB", "OpenSVC collector applicative code")
// }

}
}
Expand Down
2 changes: 1 addition & 1 deletion share/opensvc/account.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ state: enabled
access:
site:
login: 'true'
hashed_password: repman
hashed_password: xxxx
9 changes: 9 additions & 0 deletions utils/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,15 @@ func (SM *StateMachine) GetStates() []string {
return log
}

func (SM *StateMachine) GetFirstStates() []string {
var log []string
for key, value := range SM.GetLastOpenedStates() {
log = append(log, fmt.Sprintf("OPENED %s : %s", key, value.ErrDesc))
}

return log
}

func (SM *StateMachine) GetLastResolvedStates() map[string]State {
resolved := make(map[string]State)
SM.Lock()
Expand Down