Skip to content

Commit

Permalink
Concurrent get orchestrator nodes #475
Browse files Browse the repository at this point in the history
  • Loading branch information
emmaloubersac committed Feb 27, 2023
1 parent f6327a9 commit f58e5bf
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 16 deletions.
37 changes: 25 additions & 12 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ type Cluster struct {
WaitingFailover int `json:"waitingFailover"`
Configurator configurator.Configurator `json:"configurator"`
DiffVariables []VariableDiff `json:"diffVariables"`
insideInitNodes bool `json:"-"`
inInitNodes bool `json:"-"`
CanInitNodes bool `json:"canInitNodes"`
errorInitNodes error `json:"-"`
sync.Mutex
crcTable *crc64.Table
}
Expand Down Expand Up @@ -264,6 +266,7 @@ func (cluster *Cluster) Init(conf config.Config, cfgGroup string, tlog *s18log.T
cluster.addtableCond = nbc.New()
cluster.altertableCond = nbc.New()
cluster.canFlashBack = true
cluster.CanInitNodes = true
cluster.runOnceAfterTopology = true
cluster.testStopCluster = true
cluster.testStartCluster = true
Expand Down Expand Up @@ -362,23 +365,23 @@ func (cluster *Cluster) Init(conf config.Config, cfgGroup string, tlog *s18log.T
}

func (cluster *Cluster) initOrchetratorNodes() {
if cluster.insideInitNodes {
if cluster.inInitNodes {
return
}
cluster.insideInitNodes = true
defer func() { cluster.insideInitNodes = false }()
cluster.inInitNodes = true
defer func() { cluster.inInitNodes = false }()

//defer cluster.insideInitNodes = false
//cluster.LogPrintf(LvlInfo, "Loading nodes from orchestrator %s", cluster.Conf.ProvOrchestrator)
switch cluster.GetOrchestrator() {
case config.ConstOrchestratorOpenSVC:
cluster.Agents, _ = cluster.OpenSVCGetNodes()
cluster.Agents, cluster.errorInitNodes = cluster.OpenSVCGetNodes()
case config.ConstOrchestratorKubernetes:
cluster.Agents, _ = cluster.K8SGetNodes()
cluster.Agents, cluster.errorInitNodes = cluster.K8SGetNodes()
case config.ConstOrchestratorSlapOS:
cluster.Agents, _ = cluster.SlapOSGetNodes()
cluster.Agents, cluster.errorInitNodes = cluster.SlapOSGetNodes()
case config.ConstOrchestratorLocalhost:
cluster.Agents, _ = cluster.LocalhostGetNodes()
cluster.Agents, cluster.errorInitNodes = cluster.LocalhostGetNodes()
case config.ConstOrchestratorOnPremise:
default:
log.Fatalln("prov-orchestrator not supported", cluster.Conf.ProvOrchestrator)
Expand Down Expand Up @@ -452,7 +455,7 @@ func (cluster *Cluster) Run() {
if !cluster.IsInFailover() {
cluster.initProxies()
}
cluster.initOrchetratorNodes()
go cluster.initOrchetratorNodes()
cluster.ResticFetchRepo()
cluster.runOnceAfterTopology = false
} else {
Expand All @@ -469,7 +472,7 @@ func (cluster *Cluster) Run() {
cluster.InjectProxiesTraffic()
}
if cluster.sme.GetHeartbeats()%30 == 0 {
cluster.initOrchetratorNodes()
go cluster.initOrchetratorNodes()
cluster.MonitorQueryRules()
cluster.MonitorVariablesDiff()
cluster.ResticFetchRepo()
Expand All @@ -479,9 +482,12 @@ func (cluster *Cluster) Run() {
cluster.sme.PreserveState("WARN0093")
cluster.sme.PreserveState("WARN0084")
cluster.sme.PreserveState("WARN0095")
cluster.sme.PreserveState("ERR00082")
cluster.sme.PreserveState("WARN0101")
}
if !cluster.CanInitNodes {
cluster.SetState("ERR00082", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["ERR00082"], cluster.errorInitNodes), ErrFrom: "OPENSVC"})
}

if cluster.sme.GetHeartbeats()%36000 == 0 {
cluster.ResticPurgeRepo()
} else {
Expand Down Expand Up @@ -569,8 +575,13 @@ func (cluster *Cluster) StateProcessing() {
}
// cluster.statecloseChan <- s
}
var states []string
if cluster.runOnceAfterTopology {
states = cluster.sme.GetFirstStates()

states := cluster.sme.GetStates()
} else {
states = cluster.sme.GetStates()
}
for i := range states {
cluster.LogPrintf("STATE", states[i])
}
Expand All @@ -581,7 +592,9 @@ func (cluster *Cluster) StateProcessing() {
}

for _, s := range cluster.sme.GetLastOpenedStates() {

cluster.CheckAlert(s)

}

cluster.sme.ClearState()
Expand Down
4 changes: 3 additions & 1 deletion cluster/prov_opensvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ func (cluster *Cluster) OpenSVCGetNodes() ([]Agent, error) {
svc := cluster.OpenSVCConnect()
hosts, err := svc.GetNodes()
if err != nil {
cluster.SetState("ERR00082", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["ERR00082"], err), ErrFrom: "OPENSVC"})
cluster.CanInitNodes = false
return nil, err
} else {
cluster.CanInitNodes = true
}
if hosts == nil {
return nil, errors.New("Empty Opensvc Agent list")
Expand Down
3 changes: 1 addition & 2 deletions cluster/srv_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/signal18/replication-manager/utils/dbhelper"
"github.com/signal18/replication-manager/utils/misc"
"github.com/signal18/replication-manager/utils/state"
)

func (server *ServerMonitor) SetPlacement(k int, ProvAgents string, SlapOSDBPartitions string, SchedulerReceiverPorts string) {
Expand Down Expand Up @@ -230,7 +229,7 @@ func (server *ServerMonitor) SetCredential(url string, user string, pass string)
server.Host, server.Port, server.PostgressDB = misc.SplitHostPortDB(url)
server.IP, err = dbhelper.CheckHostAddr(server.Host)
if err != nil {
server.ClusterGroup.SetState("ERR00062", state.State{ErrType: LvlWarn, ErrDesc: fmt.Sprintf(clusterError["ERR00062"], server.Host, err.Error()), ErrFrom: "TOPO"})
server.GetCluster().LogPrintf(LvlErr, "Cannot resolved DNS for host %s, error: %s", server.Host, err.Error())
}
if server.PostgressDB == "" {
server.PostgressDB = "test"
Expand Down
2 changes: 1 addition & 1 deletion server/server_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ func init() {
monitorCmd.Flags().StringVar(&conf.KubeConfig, "kube-config", "", "path to ks8 config file")
monitorCmd.Flags().StringVar(&conf.ProvOpensvcCollectorAccount, "opensvc-collector-account", "/etc/replication-manager/account.yaml", "Openscv collector account")

dbConfig := viper.New()
if conf.ProvOpensvcUseCollectorAPI {
dbConfig := viper.New()
dbConfig.SetConfigType("yaml")
file, err := ioutil.ReadFile(conf.ProvOpensvcCollectorAccount)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions utils/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,15 @@ func (SM *StateMachine) GetStates() []string {
return log
}

func (SM *StateMachine) GetFirstStates() []string {
var log []string
for key, value := range SM.GetLastOpenedStates() {
log = append(log, fmt.Sprintf("OPENED %s : %s", key, value.ErrDesc))
}

return log
}

func (SM *StateMachine) GetLastResolvedStates() map[string]State {
resolved := make(map[string]State)
SM.Lock()
Expand Down

0 comments on commit f58e5bf

Please sign in to comment.