Skip to content

Commit

Permalink
improve health checker for adapting coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch committed Oct 21, 2022
1 parent 0cedfbb commit 9195347
Show file tree
Hide file tree
Showing 15 changed files with 918 additions and 393 deletions.
3 changes: 3 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type YurtHubConfiguration struct {
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Expand All @@ -89,6 +90,7 @@ type YurtHubConfiguration struct {
KubeletHealthGracePeriod time.Duration
FilterManager *manager.Manager
CertIPs []net.IP
CoordinatorServer *url.URL
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -155,6 +157,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
HeartbeatFailedRetry: options.HeartbeatFailedRetry,
HeartbeatHealthyThreshold: options.HeartbeatHealthyThreshold,
HeartbeatTimeoutSeconds: options.HeartbeatTimeoutSeconds,
HeartbeatIntervalSeconds: options.HeartbeatIntervalSeconds,
MaxRequestInFlight: options.MaxRequestInFlight,
JoinToken: options.JoinToken,
RootDir: options.RootDir,
Expand Down
3 changes: 3 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type YurtHubOptions struct {
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Expand Down Expand Up @@ -89,6 +90,7 @@ func NewYurtHubOptions() *YurtHubOptions {
HeartbeatFailedRetry: 3,
HeartbeatHealthyThreshold: 2,
HeartbeatTimeoutSeconds: 2,
HeartbeatIntervalSeconds: 10,
MaxRequestInFlight: 250,
RootDir: filepath.Join("/var/lib/", projectinfo.GetHubName()),
EnableProfiling: true,
Expand Down Expand Up @@ -148,6 +150,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.HeartbeatFailedRetry, "heartbeat-failed-retry", o.HeartbeatFailedRetry, "number of heartbeat request retry after having failed.")
fs.IntVar(&o.HeartbeatHealthyThreshold, "heartbeat-healthy-threshold", o.HeartbeatHealthyThreshold, "minimum consecutive successes for the heartbeat to be considered healthy after having failed.")
fs.IntVar(&o.HeartbeatTimeoutSeconds, "heartbeat-timeout-seconds", o.HeartbeatTimeoutSeconds, " number of seconds after which the heartbeat times out.")
fs.IntVar(&o.HeartbeatIntervalSeconds, "heartbeat-interval-seconds", o.HeartbeatIntervalSeconds, " number of seconds for omitting one time heartbeat to remote server.")
fs.IntVar(&o.MaxRequestInFlight, "max-requests-in-flight", o.MaxRequestInFlight, "the maximum number of parallel requests.")
fs.StringVar(&o.JoinToken, "join-token", o.JoinToken, "the Join token for bootstrapping hub agent when --cert-mgr-mode=hubself.")
fs.StringVar(&o.RootDir, "root-dir", o.RootDir, "directory path for managing hub agent files(pki, cache etc).")
Expand Down
52 changes: 46 additions & 6 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package app

import (
"fmt"
"net/url"
"path/filepath"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
Expand All @@ -33,7 +36,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself"
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
Expand Down Expand Up @@ -112,10 +115,17 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}
trace++

var healthChecker healthchecker.HealthChecker
klog.Infof("%d. prepare for health checker clients", trace)
healthCheckerClientsForCloud, _, err := createHealthCheckerClient(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServer, transportManager)
if err != nil {
return fmt.Errorf("failed to create health checker clients, %w", err)
}
trace++

var healthChecker healthchecker.MultipleBackendsHealthChecker
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. create health checker for remote servers ", trace)
healthChecker, err = healthchecker.NewHealthChecker(cfg, transportManager, stopCh)
healthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, healthCheckerClientsForCloud, stopCh)
if err != nil {
return fmt.Errorf("could not new health checker, %w", err)
}
Expand All @@ -125,11 +135,10 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
// This fake checker will always report that the remote server is healthy.
healthChecker = healthchecker.NewFakeChecker(true, make(map[string]int))
}
healthChecker.Run()
trace++

klog.Infof("%d. new restConfig manager for %s mode", trace, cfg.CertMgrMode)
restConfigMgr, err := rest.NewRestConfigManager(cfg, certManager, healthChecker)
restConfigMgr, err := hubrest.NewRestConfigManager(cfg, certManager, healthChecker)
if err != nil {
return fmt.Errorf("could not new restConfig manager, %w", err)
}
Expand Down Expand Up @@ -172,7 +181,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, tenantMgr, stopCh)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, tenantMgr, stopCh)

if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
Expand Down Expand Up @@ -203,3 +212,34 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
klog.Infof("hub agent exited")
return nil
}

func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, kubernetes.Interface, error) {
var healthCheckerClientForCoordinator kubernetes.Interface
healthCheckerClientsForCloud := make(map[string]kubernetes.Interface)
for i := range remoteServers {
restConf := &rest.Config{
Host: remoteServers[i].String(),
Transport: tp.CurrentTransport(),
Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
}
c, err := kubernetes.NewForConfig(restConf)
if err != nil {
return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
}
healthCheckerClientsForCloud[remoteServers[i].String()] = c
}

// comment the following code temporarily
//cfg := &rest.Config{
// Host: coordinatorServer.String(),
// Transport: tp.CurrentTransport(),
// Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
//}
//c, err := kubernetes.NewForConfig(cfg)
//if err != nil {
// return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
//}
//healthCheckerClientForCoordinator = c

return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, nil
}
13 changes: 6 additions & 7 deletions pkg/yurthub/healthchecker/fake_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package healthchecker

import (
"net/url"
"time"
)

type fakeChecker struct {
healthy bool
settings map[string]int
}

// IsHealthy returns healthy status of server
func (fc *fakeChecker) IsHealthy(server *url.URL) bool {
// BackendHealthyStatus returns healthy status of server
func (fc *fakeChecker) BackendHealthyStatus(server *url.URL) bool {
s := server.String()
if _, ok := fc.settings[s]; !ok {
return fc.healthy
Expand All @@ -45,16 +44,16 @@ func (fc *fakeChecker) IsHealthy(server *url.URL) bool {
return fc.healthy
}

func (fc *fakeChecker) Run() {
return
func (fc *fakeChecker) IsHealthy() bool {
return fc.healthy
}

func (fc *fakeChecker) UpdateLastKubeletLeaseReqTime(time.Time) {
func (fc *fakeChecker) RenewKubeletLeaseTime() {
return
}

// NewFakeChecker creates a fake checker
func NewFakeChecker(healthy bool, settings map[string]int) HealthChecker {
func NewFakeChecker(healthy bool, settings map[string]int) MultipleBackendsHealthChecker {
return &fakeChecker{
settings: settings,
healthy: healthy,
Expand Down
Loading

0 comments on commit 9195347

Please sign in to comment.