Skip to content

Commit

Permalink
added heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
roffe committed Feb 3, 2018
1 parent 3eb5461 commit 12aec99
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 17 deletions.
62 changes: 57 additions & 5 deletions app/controllers/health_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@ type ControllerHeartbeat struct {
//HealthController reports the health of the controller loops as a http endpoint
type HealthController struct {
HealthPort uint16
Status HealthStats
Config *options.KubeRouterConfig
}

type HealthStats struct {
Healthy bool
MetricsControllerAlive time.Time
NetworkPolicyControllerAlive time.Time
NetworkRoutingControllerAlive time.Time
NetworkServicesControllerAlive time.Time
}

func sendHeartBeat(channel chan<- *ControllerHeartbeat, controller string) {
glog.Infof("Send Heartbeat from %s", controller)
heartbeat := ControllerHeartbeat{
Component: controller,
Lastheartbeat: time.Now(),
Expand All @@ -31,8 +40,47 @@ func sendHeartBeat(channel chan<- *ControllerHeartbeat, controller string) {
}

func (hc *HealthController) Handler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("These aren't the droids you're looking for\n"))
if hc.Status.Healthy {
w.WriteHeader(http.StatusOK)
w.Write([]byte("These aren't the droids you're looking for\n"))
} else {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("These are the droids you're looking for\n"))
}
}

func (hc *HealthController) HandleHeartbeat(beat *ControllerHeartbeat) {
glog.Infof("Received heartbeat from %s", beat.Component)
switch component := beat.Component; component {
case "NSC":
hc.Status.NetworkServicesControllerAlive = time.Now()
case "NRC":
hc.Status.NetworkRoutingControllerAlive = time.Now()
case "NPC":
hc.Status.NetworkPolicyControllerAlive = time.Now()
case "MC":
hc.Status.MetricsControllerAlive = time.Now()
}
}

func (hc *HealthController) CheckHealth() bool {
glog.V(4).Info("Checking components")
health := true
if time.Since(hc.Status.NetworkPolicyControllerAlive) > hc.Config.IPTablesSyncPeriod+3*time.Second {
glog.Error("Network Policy Controller heartbeat timeout")
health = false
}

if time.Since(hc.Status.NetworkRoutingControllerAlive) > hc.Config.RoutesSyncPeriod+3*time.Second {
glog.Error("Network Routing Controller heartbeat timeout")
health = false
}

if time.Since(hc.Status.NetworkServicesControllerAlive) > hc.Config.IpvsSyncPeriod+3*time.Second {
glog.Error("NetworkService Controller heartbeat timeout")
health = false
}
return health
}

func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
Expand All @@ -53,6 +101,9 @@ func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <
}()

for {

hc.Status.Healthy = hc.CheckHealth()

select {
case <-stopCh:
glog.Infof("Shutting down health controller")
Expand All @@ -61,16 +112,17 @@ func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <
}
return nil
case heartbeat := <-healthChan:
glog.Infof("Received heartbeat from %s", heartbeat.Component)
hc.HandleHeartbeat(heartbeat)
case <-t.C:
glog.Infof("Health controller tick")
glog.Info("Health controller tick")
}
}

}

func NewHealthController(config *options.KubeRouterConfig) (*HealthController, error) {
hc := HealthController{
Config: config,
HealthPort: config.HealthPort,
}
return &hc, nil
Expand Down
4 changes: 3 additions & 1 deletion app/controllers/metrics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type MetricsController struct {
}

// Run prometheus metrics controller
func (mc *MetricsController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) error {
func (mc *MetricsController) Run(healthChan chan<- *ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
defer wg.Done()
glog.Info("Starting metrics controller")

Expand All @@ -137,6 +137,8 @@ func (mc *MetricsController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) err
}
}()

sendHeartBeat(healthChan, "MC")

<-stopCh
glog.Infof("Shutting down metrics controller")
if err := srv.Shutdown(context.Background()); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion app/controllers/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type protocolAndPort struct {
}

// Run runs forver till we receive notification on stopCh
func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
t := time.NewTicker(npc.syncPeriod)
defer t.Stop()
defer wg.Done()
Expand All @@ -126,6 +126,8 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro
continue
}

sendHeartBeat(healthChan, "NPC")

select {
case <-stopCh:
glog.Infof("Shutting down network policies controller")
Expand Down
4 changes: 3 additions & 1 deletion app/controllers/network_routes_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const (
)

// Run runs forever until we are notified on stop channel
func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
func (nrc *NetworkRoutingController) Run(healthChan chan<- *ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
cidr, err := utils.GetPodCidrFromCniSpec("/etc/cni/net.d/10-kuberouter.conf")
if err != nil {
glog.Errorf("Failed to get pod CIDR from CNI conf file: %s", err.Error())
Expand Down Expand Up @@ -254,6 +254,8 @@ func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGr
nrc.syncInternalPeers()
}

sendHeartBeat(healthChan, "NRC")

select {
case <-stopCh:
glog.Infof("Shutting down network routes controller")
Expand Down
18 changes: 9 additions & 9 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ func (kr *KubeRouter) Run() error {
os.Exit(0)
}

//if (kr.Config.HealthPort > 0) && (kr.Config.HealthPort <= 65535) {
hc, err := controllers.NewHealthController(kr.Config)
if err != nil {
return errors.New("Failed to create health controller: " + err.Error())
if (kr.Config.HealthPort > 0) && (kr.Config.HealthPort <= 65535) {
hc, err := controllers.NewHealthController(kr.Config)
if err != nil {
return errors.New("Failed to create health controller: " + err.Error())
}
wg.Add(1)
go hc.Run(healthChan, stopCh, &wg)
}
wg.Add(1)
go hc.Run(healthChan, stopCh, &wg)
//}

if (kr.Config.MetricsPort > 0) && (kr.Config.MetricsPort <= 65535) {
kr.Config.MetricsEnabled = true
Expand All @@ -157,7 +157,7 @@ func (kr *KubeRouter) Run() error {
}

wg.Add(1)
go npc.Run(stopCh, &wg)
go npc.Run(healthChan, stopCh, &wg)
}

if kr.Config.RunRouter {
Expand All @@ -167,7 +167,7 @@ func (kr *KubeRouter) Run() error {
}

wg.Add(1)
go nrc.Run(stopCh, &wg)
go nrc.Run(healthChan, stopCh, &wg)
}

if kr.Config.RunServiceProxy {
Expand Down

0 comments on commit 12aec99

Please sign in to comment.