Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve health checker for adapting coordinator #1032

Merged
merged 1 commit into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type YurtHubConfiguration struct {
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Expand All @@ -89,6 +90,7 @@ type YurtHubConfiguration struct {
KubeletHealthGracePeriod time.Duration
FilterManager *manager.Manager
CertIPs []net.IP
CoordinatorServer *url.URL
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -155,6 +157,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
HeartbeatFailedRetry: options.HeartbeatFailedRetry,
HeartbeatHealthyThreshold: options.HeartbeatHealthyThreshold,
HeartbeatTimeoutSeconds: options.HeartbeatTimeoutSeconds,
HeartbeatIntervalSeconds: options.HeartbeatIntervalSeconds,
MaxRequestInFlight: options.MaxRequestInFlight,
JoinToken: options.JoinToken,
RootDir: options.RootDir,
Expand Down
3 changes: 3 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type YurtHubOptions struct {
HeartbeatFailedRetry int
HeartbeatHealthyThreshold int
HeartbeatTimeoutSeconds int
HeartbeatIntervalSeconds int
MaxRequestInFlight int
JoinToken string
RootDir string
Expand Down Expand Up @@ -89,6 +90,7 @@ func NewYurtHubOptions() *YurtHubOptions {
HeartbeatFailedRetry: 3,
HeartbeatHealthyThreshold: 2,
HeartbeatTimeoutSeconds: 2,
HeartbeatIntervalSeconds: 10,
MaxRequestInFlight: 250,
RootDir: filepath.Join("/var/lib/", projectinfo.GetHubName()),
EnableProfiling: true,
Expand Down Expand Up @@ -148,6 +150,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.HeartbeatFailedRetry, "heartbeat-failed-retry", o.HeartbeatFailedRetry, "number of heartbeat request retry after having failed.")
fs.IntVar(&o.HeartbeatHealthyThreshold, "heartbeat-healthy-threshold", o.HeartbeatHealthyThreshold, "minimum consecutive successes for the heartbeat to be considered healthy after having failed.")
fs.IntVar(&o.HeartbeatTimeoutSeconds, "heartbeat-timeout-seconds", o.HeartbeatTimeoutSeconds, " number of seconds after which the heartbeat times out.")
fs.IntVar(&o.HeartbeatIntervalSeconds, "heartbeat-interval-seconds", o.HeartbeatIntervalSeconds, " number of seconds for omitting one time heartbeat to remote server.")
fs.IntVar(&o.MaxRequestInFlight, "max-requests-in-flight", o.MaxRequestInFlight, "the maximum number of parallel requests.")
fs.StringVar(&o.JoinToken, "join-token", o.JoinToken, "the Join token for bootstrapping hub agent when --cert-mgr-mode=hubself.")
fs.StringVar(&o.RootDir, "root-dir", o.RootDir, "directory path for managing hub agent files(pki, cache etc).")
Expand Down
52 changes: 46 additions & 6 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package app

import (
"fmt"
"net/url"
"path/filepath"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
Expand All @@ -33,7 +36,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself"
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
Expand Down Expand Up @@ -112,10 +115,17 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}
trace++

var healthChecker healthchecker.HealthChecker
klog.Infof("%d. prepare for health checker clients", trace)
healthCheckerClientsForCloud, _, err := createHealthCheckerClient(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServer, transportManager)
if err != nil {
return fmt.Errorf("failed to create health checker clients, %w", err)
}
trace++

var healthChecker healthchecker.MultipleBackendsHealthChecker
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. create health checker for remote servers ", trace)
healthChecker, err = healthchecker.NewHealthChecker(cfg, transportManager, stopCh)
healthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, healthCheckerClientsForCloud, stopCh)
if err != nil {
return fmt.Errorf("could not new health checker, %w", err)
}
Expand All @@ -125,11 +135,10 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
// This fake checker will always report that the remote server is healthy.
healthChecker = healthchecker.NewFakeChecker(true, make(map[string]int))
}
healthChecker.Run()
trace++

klog.Infof("%d. new restConfig manager for %s mode", trace, cfg.CertMgrMode)
restConfigMgr, err := rest.NewRestConfigManager(cfg, certManager, healthChecker)
restConfigMgr, err := hubrest.NewRestConfigManager(cfg, certManager, healthChecker)
if err != nil {
return fmt.Errorf("could not new restConfig manager, %w", err)
}
Expand Down Expand Up @@ -172,7 +181,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, tenantMgr, stopCh)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, tenantMgr, stopCh)

if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
Expand Down Expand Up @@ -203,3 +212,34 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
klog.Infof("hub agent exited")
return nil
}

func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, kubernetes.Interface, error) {
var healthCheckerClientForCoordinator kubernetes.Interface
healthCheckerClientsForCloud := make(map[string]kubernetes.Interface)
for i := range remoteServers {
restConf := &rest.Config{
Host: remoteServers[i].String(),
Transport: tp.CurrentTransport(),
Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
}
c, err := kubernetes.NewForConfig(restConf)
if err != nil {
return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
}
healthCheckerClientsForCloud[remoteServers[i].String()] = c
}

// comment the following code temporarily
//cfg := &rest.Config{
// Host: coordinatorServer.String(),
// Transport: tp.CurrentTransport(),
// Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second,
//}
//c, err := kubernetes.NewForConfig(cfg)
//if err != nil {
// return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err
//}
//healthCheckerClientForCoordinator = c

return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, nil
}
13 changes: 6 additions & 7 deletions pkg/yurthub/healthchecker/fake_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package healthchecker

import (
"net/url"
"time"
)

type fakeChecker struct {
healthy bool
settings map[string]int
}

// IsHealthy returns healthy status of server
func (fc *fakeChecker) IsHealthy(server *url.URL) bool {
// BackendHealthyStatus returns healthy status of server
func (fc *fakeChecker) BackendHealthyStatus(server *url.URL) bool {
s := server.String()
if _, ok := fc.settings[s]; !ok {
return fc.healthy
Expand All @@ -45,16 +44,16 @@ func (fc *fakeChecker) IsHealthy(server *url.URL) bool {
return fc.healthy
}

func (fc *fakeChecker) Run() {
return
func (fc *fakeChecker) IsHealthy() bool {
return fc.healthy
}

func (fc *fakeChecker) UpdateLastKubeletLeaseReqTime(time.Time) {
func (fc *fakeChecker) RenewKubeletLeaseTime() {
return
}

// NewFakeChecker creates a fake checker
func NewFakeChecker(healthy bool, settings map[string]int) HealthChecker {
func NewFakeChecker(healthy bool, settings map[string]int) MultipleBackendsHealthChecker {
return &fakeChecker{
settings: settings,
healthy: healthy,
Expand Down
Loading