From 4db7624cbe5fd15df0deba8257991cc6c6c98004 Mon Sep 17 00:00:00 2001 From: rambohe-ch Date: Tue, 18 Oct 2022 00:02:18 +0800 Subject: [PATCH] improve health checker for adapting coordinator --- cmd/yurthub/app/config/config.go | 3 + cmd/yurthub/app/options/options.go | 3 + cmd/yurthub/app/start.go | 52 ++- pkg/yurthub/healthchecker/fake_checker.go | 13 +- pkg/yurthub/healthchecker/health_checker.go | 309 ++++++------- .../healthchecker/health_checker_test.go | 418 ++++++++++++------ pkg/yurthub/healthchecker/interfaces.go | 50 +++ pkg/yurthub/healthchecker/node_lease.go | 5 +- pkg/yurthub/healthchecker/prober.go | 161 +++++++ pkg/yurthub/healthchecker/prober_test.go | 210 +++++++++ pkg/yurthub/kubernetes/rest/config.go | 6 +- pkg/yurthub/proxy/proxy.go | 12 +- pkg/yurthub/proxy/remote/loadbalancer.go | 48 +- pkg/yurthub/proxy/remote/loadbalancer_test.go | 8 +- pkg/yurthub/proxy/remote/remote.go | 13 +- 15 files changed, 918 insertions(+), 393 deletions(-) create mode 100644 pkg/yurthub/healthchecker/interfaces.go create mode 100644 pkg/yurthub/healthchecker/prober.go create mode 100644 pkg/yurthub/healthchecker/prober_test.go diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index acce2b55cb9..82db73f8489 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -72,6 +72,7 @@ type YurtHubConfiguration struct { HeartbeatFailedRetry int HeartbeatHealthyThreshold int HeartbeatTimeoutSeconds int + HeartbeatIntervalSeconds int MaxRequestInFlight int JoinToken string RootDir string @@ -89,6 +90,7 @@ type YurtHubConfiguration struct { KubeletHealthGracePeriod time.Duration FilterManager *manager.Manager CertIPs []net.IP + CoordinatorServer *url.URL } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -155,6 +157,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { HeartbeatFailedRetry: options.HeartbeatFailedRetry, HeartbeatHealthyThreshold: options.HeartbeatHealthyThreshold, HeartbeatTimeoutSeconds: options.HeartbeatTimeoutSeconds, + HeartbeatIntervalSeconds: options.HeartbeatIntervalSeconds, MaxRequestInFlight: options.MaxRequestInFlight, JoinToken: options.JoinToken, RootDir: options.RootDir, diff --git a/cmd/yurthub/app/options/options.go b/cmd/yurthub/app/options/options.go index 9a800a047d1..7670f77c1c2 100644 --- a/cmd/yurthub/app/options/options.go +++ b/cmd/yurthub/app/options/options.go @@ -56,6 +56,7 @@ type YurtHubOptions struct { HeartbeatFailedRetry int HeartbeatHealthyThreshold int HeartbeatTimeoutSeconds int + HeartbeatIntervalSeconds int MaxRequestInFlight int JoinToken string RootDir string @@ -89,6 +90,7 @@ func NewYurtHubOptions() *YurtHubOptions { HeartbeatFailedRetry: 3, HeartbeatHealthyThreshold: 2, HeartbeatTimeoutSeconds: 2, + HeartbeatIntervalSeconds: 10, MaxRequestInFlight: 250, RootDir: filepath.Join("/var/lib/", projectinfo.GetHubName()), EnableProfiling: true, @@ -148,6 +150,7 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&o.HeartbeatFailedRetry, "heartbeat-failed-retry", o.HeartbeatFailedRetry, "number of heartbeat request retry after having failed.") fs.IntVar(&o.HeartbeatHealthyThreshold, "heartbeat-healthy-threshold", o.HeartbeatHealthyThreshold, "minimum consecutive successes for the heartbeat to be considered healthy after having failed.") fs.IntVar(&o.HeartbeatTimeoutSeconds, "heartbeat-timeout-seconds", o.HeartbeatTimeoutSeconds, " number of seconds after which the heartbeat times out.") + fs.IntVar(&o.HeartbeatIntervalSeconds, "heartbeat-interval-seconds", o.HeartbeatIntervalSeconds, " number of seconds for omitting one time heartbeat to remote server.") fs.IntVar(&o.MaxRequestInFlight, "max-requests-in-flight", o.MaxRequestInFlight, "the maximum number of parallel requests.") fs.StringVar(&o.JoinToken, "join-token", o.JoinToken, "the Join token for bootstrapping hub agent when --cert-mgr-mode=hubself.") fs.StringVar(&o.RootDir, "root-dir", o.RootDir, "directory path for managing hub agent files(pki, cache etc).") diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 13025dfe7f3..06163ac5bad 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -18,12 +18,15 @@ package app import ( "fmt" + "net/url" "path/filepath" "time" "github.com/spf13/cobra" "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" @@ -33,7 +36,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself" "github.com/openyurtio/openyurt/pkg/yurthub/gc" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" - "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" + hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" "github.com/openyurtio/openyurt/pkg/yurthub/network" "github.com/openyurtio/openyurt/pkg/yurthub/proxy" "github.com/openyurtio/openyurt/pkg/yurthub/server" @@ -112,10 +115,17 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { } trace++ - var healthChecker healthchecker.HealthChecker + klog.Infof("%d. prepare for health checker clients", trace) + healthCheckerClientsForCloud, _, err := createHealthCheckerClient(cfg.HeartbeatTimeoutSeconds, cfg.RemoteServers, cfg.CoordinatorServer, transportManager) + if err != nil { + return fmt.Errorf("failed to create health checker clients, %w", err) + } + trace++ + + var healthChecker healthchecker.MultipleBackendsHealthChecker if cfg.WorkingMode == util.WorkingModeEdge { klog.Infof("%d. create health checker for remote servers ", trace) - healthChecker, err = healthchecker.NewHealthChecker(cfg, transportManager, stopCh) + healthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, healthCheckerClientsForCloud, stopCh) if err != nil { return fmt.Errorf("could not new health checker, %w", err) } @@ -125,11 +135,10 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { // This fake checker will always report that the remote server is healthy. healthChecker = healthchecker.NewFakeChecker(true, make(map[string]int)) } - healthChecker.Run() trace++ klog.Infof("%d. new restConfig manager for %s mode", trace, cfg.CertMgrMode) - restConfigMgr, err := rest.NewRestConfigManager(cfg, certManager, healthChecker) + restConfigMgr, err := hubrest.NewRestConfigManager(cfg, certManager, healthChecker) if err != nil { return fmt.Errorf("could not new restConfig manager, %w", err) } @@ -172,7 +181,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { trace++ klog.Infof("%d. new reverse proxy handler for remote servers", trace) - yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, tenantMgr, stopCh) + yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, tenantMgr, stopCh) if err != nil { return fmt.Errorf("could not create reverse proxy handler, %w", err) @@ -203,3 +212,34 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { klog.Infof("hub agent exited") return nil } + +func createHealthCheckerClient(heartbeatTimeoutSeconds int, remoteServers []*url.URL, coordinatorServer *url.URL, tp transport.Interface) (map[string]kubernetes.Interface, kubernetes.Interface, error) { + var healthCheckerClientForCoordinator kubernetes.Interface + healthCheckerClientsForCloud := make(map[string]kubernetes.Interface) + for i := range remoteServers { + restConf := &rest.Config{ + Host: remoteServers[i].String(), + Transport: tp.CurrentTransport(), + Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second, + } + c, err := kubernetes.NewForConfig(restConf) + if err != nil { + return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err + } + healthCheckerClientsForCloud[remoteServers[i].String()] = c + } + + // comment the following code temporarily + //cfg := &rest.Config{ + // Host: coordinatorServer.String(), + // Transport: tp.CurrentTransport(), + // Timeout: time.Duration(heartbeatTimeoutSeconds) * time.Second, + //} + //c, err := kubernetes.NewForConfig(cfg) + //if err != nil { + // return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, err + //} + //healthCheckerClientForCoordinator = c + + return healthCheckerClientsForCloud, healthCheckerClientForCoordinator, nil +} diff --git a/pkg/yurthub/healthchecker/fake_checker.go b/pkg/yurthub/healthchecker/fake_checker.go index 9fee7652196..84a166833a8 100644 --- a/pkg/yurthub/healthchecker/fake_checker.go +++ b/pkg/yurthub/healthchecker/fake_checker.go @@ -18,7 +18,6 @@ package healthchecker import ( "net/url" - "time" ) type fakeChecker struct { @@ -26,8 +25,8 @@ type fakeChecker struct { settings map[string]int } -// IsHealthy returns healthy status of server -func (fc *fakeChecker) IsHealthy(server *url.URL) bool { +// BackendHealthyStatus returns healthy status of server +func (fc *fakeChecker) BackendHealthyStatus(server *url.URL) bool { s := server.String() if _, ok := fc.settings[s]; !ok { return fc.healthy @@ -45,16 +44,16 @@ func (fc *fakeChecker) IsHealthy(server *url.URL) bool { return fc.healthy } -func (fc *fakeChecker) Run() { - return +func (fc *fakeChecker) IsHealthy() bool { + return fc.healthy } -func (fc *fakeChecker) UpdateLastKubeletLeaseReqTime(time.Time) { +func (fc *fakeChecker) RenewKubeletLeaseTime() { return } // NewFakeChecker creates a fake checker -func NewFakeChecker(healthy bool, settings map[string]int) HealthChecker { +func NewFakeChecker(healthy bool, settings map[string]int) MultipleBackendsHealthChecker { return &fakeChecker{ settings: settings, healthy: healthy, diff --git a/pkg/yurthub/healthchecker/health_checker.go b/pkg/yurthub/healthchecker/health_checker.go index cddb7ea0cda..88330a74311 100644 --- a/pkg/yurthub/healthchecker/health_checker.go +++ b/pkg/yurthub/healthchecker/health_checker.go @@ -24,93 +24,67 @@ import ( coordinationv1 "k8s.io/api/coordination/v1" "k8s.io/apimachinery/pkg/api/meta" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" - "github.com/openyurtio/openyurt/pkg/yurthub/metrics" - "github.com/openyurtio/openyurt/pkg/yurthub/transport" ) const ( - heartbeatFrequency = 10 * time.Second + delegateHeartBeat = "openyurt.io/delegate-heartbeat" ) -// HealthChecker is an interface for checking healthy stats of server -type HealthChecker interface { - IsHealthy(server *url.URL) bool - UpdateLastKubeletLeaseReqTime(time.Time) - Run() -} - type setNodeLease func(*coordinationv1.Lease) error type getNodeLease func() *coordinationv1.Lease -type healthCheckerManager struct { +type cloudAPIServerHealthChecker struct { sync.RWMutex - remoteServers []*url.URL - checkers map[string]*checker - latestLease *coordinationv1.Lease - sw cachemanager.StorageWrapper - remoteServerIndex int - lastKubeletLeaseReqTime time.Time - healthCheckGracePeriod time.Duration - stopCh <-chan struct{} + remoteServers []*url.URL + probers map[string]BackendProber + latestLease *coordinationv1.Lease + sw cachemanager.StorageWrapper + remoteServerIndex int + heartbeatInterval int } -type checker struct { +type coordinatorHealthChecker struct { sync.RWMutex - remoteServer *url.URL - clusterHealthy bool - healthyThreshold int - healthyCnt int - lastTime time.Time - nodeLease NodeLease - getLastNodeLease getNodeLease - setLastNodeLease setNodeLease - onFailureFunc func(string) + cloudServerHealthChecker HealthChecker + coordinatorProber BackendProber + latestLease *coordinationv1.Lease + heartbeatInterval int } -// NewHealthChecker create an HealthChecker for servers -func NewHealthChecker(cfg *config.YurtHubConfiguration, tp transport.Interface, stopCh <-chan struct{}) (HealthChecker, error) { - if len(cfg.RemoteServers) == 0 { - return nil, fmt.Errorf("no remote servers") +// NewCoordinatorHealthChecker returns a health checker for verifying pool coordinator status. +func NewCoordinatorHealthChecker(cfg *config.YurtHubConfiguration, checkerClient kubernetes.Interface, cloudServerHealthChecker HealthChecker, stopCh <-chan struct{}) (HealthChecker, error) { + chc := &coordinatorHealthChecker{ + cloudServerHealthChecker: cloudServerHealthChecker, + heartbeatInterval: cfg.HeartbeatIntervalSeconds, } + chc.coordinatorProber = newProber(checkerClient, + cfg.CoordinatorServer.String(), + cfg.NodeName, + cfg.HeartbeatFailedRetry, + cfg.HeartbeatHealthyThreshold, + cfg.KubeletHealthGracePeriod, + chc.setLastNodeLease, + chc.getLastNodeLease) + go chc.run(stopCh) - hcm := &healthCheckerManager{ - checkers: make(map[string]*checker), - remoteServers: cfg.RemoteServers, - remoteServerIndex: 0, - sw: cfg.StorageWrapper, - healthCheckGracePeriod: cfg.KubeletHealthGracePeriod, - stopCh: stopCh, - } + return chc, nil +} - for _, remoteServer := range cfg.RemoteServers { - c, err := newChecker(cfg, tp, remoteServer, hcm.setLastNodeLease, hcm.getLastNodeLease) - if err != nil { - return nil, err - } - hcm.checkers[remoteServer.String()] = c - if c.check() { - c.setHealthy(true) - } else { - c.setHealthy(false) - klog.Warningf("cluster remote server %v is unhealthy.", remoteServer.String()) - } - } - return hcm, nil +func (chc *coordinatorHealthChecker) IsHealthy() bool { + return chc.coordinatorProber.IsHealthy() } -func (hcm *healthCheckerManager) Run() { - go hcm.healthzCheckLoop(hcm.stopCh) - return +func (chc *coordinatorHealthChecker) RenewKubeletLeaseTime() { + chc.coordinatorProber.RenewKubeletLeaseTime(time.Now()) } -func (hcm *healthCheckerManager) healthzCheckLoop(stopCh <-chan struct{}) { - intervalTicker := time.NewTicker(heartbeatFrequency) +func (chc *coordinatorHealthChecker) run(stopCh <-chan struct{}) { + intervalTicker := time.NewTicker(time.Duration(chc.heartbeatInterval) * time.Second) defer intervalTicker.Stop() for { @@ -119,156 +93,129 @@ func (hcm *healthCheckerManager) healthzCheckLoop(stopCh <-chan struct{}) { klog.Infof("exit normally in health check loop.") return case <-intervalTicker.C: - hcm.sync() + chc.coordinatorProber.Probe(ProbePhaseNormal) } } } -func (hcm *healthCheckerManager) sync() { - isKubeletStoped := false - lastKubeletLeaseReqTime := hcm.getLastKubeletLeaseReqTime() - if !lastKubeletLeaseReqTime.IsZero() && hcm.healthCheckGracePeriod > 0 { - isKubeletStoped = time.Now().After(lastKubeletLeaseReqTime.Add(hcm.healthCheckGracePeriod)) +func (chc *coordinatorHealthChecker) setLastNodeLease(lease *coordinationv1.Lease) error { + if lease == nil { + return nil } + chc.latestLease = lease + return nil +} - // Ensure that the node heartbeat can be reported when there is a healthy remote server. - // Try to detect all remote server in a loop, if there is an remote server can update nodeLease, exit the loop. - for i := 0; i < len(hcm.remoteServers); i++ { - c := hcm.getChecker() - - if isKubeletStoped { - klog.Warningf("kubelet does not post lease request for more than %v, stop renew node lease and assume remote server is unhealthy", hcm.healthCheckGracePeriod) - c.markAsUnhealthy() - continue - } - - if c.check() { - break +func (chc *coordinatorHealthChecker) getLastNodeLease() *coordinationv1.Lease { + if chc.latestLease != nil { + if !chc.cloudServerHealthChecker.IsHealthy() { + if chc.latestLease.Annotations == nil { + chc.latestLease.Annotations = make(map[string]string) + } + chc.latestLease.Annotations[delegateHeartBeat] = "true" + } else { + delete(chc.latestLease.Annotations, delegateHeartBeat) } } + + return chc.latestLease } -func (hcm *healthCheckerManager) setLastNodeLease(lease *coordinationv1.Lease) error { - if lease == nil { - return nil +// NewCloudAPIServerHealthChecker returns a health checker for verifying cloud kube-apiserver status. +func NewCloudAPIServerHealthChecker(cfg *config.YurtHubConfiguration, healthCheckerClients map[string]kubernetes.Interface, stopCh <-chan struct{}) (MultipleBackendsHealthChecker, error) { + if len(healthCheckerClients) == 0 { + return nil, fmt.Errorf("no remote servers") } - hcm.latestLease = lease - accessor := meta.NewAccessor() - accessor.SetKind(lease, coordinationv1.SchemeGroupVersion.WithKind("Lease").Kind) - accessor.SetAPIVersion(lease, coordinationv1.SchemeGroupVersion.String()) - cacheLeaseKey := fmt.Sprintf(cacheLeaseKeyFormat, lease.Name) - return hcm.sw.Update(cacheLeaseKey, lease) -} - -func (hcm *healthCheckerManager) getLastNodeLease() *coordinationv1.Lease { - return hcm.latestLease -} + hc := &cloudAPIServerHealthChecker{ + probers: make(map[string]BackendProber), + remoteServers: cfg.RemoteServers, + remoteServerIndex: 0, + sw: cfg.StorageWrapper, + heartbeatInterval: cfg.HeartbeatIntervalSeconds, + } -func (hcm *healthCheckerManager) getChecker() *checker { - checker := hcm.checkers[hcm.remoteServers[hcm.remoteServerIndex].String()] - hcm.remoteServerIndex = (hcm.remoteServerIndex + 1) % len(hcm.remoteServers) - return checker + for remoteServer, client := range healthCheckerClients { + hc.probers[remoteServer] = newProber(client, + remoteServer, + cfg.NodeName, + cfg.HeartbeatFailedRetry, + cfg.HeartbeatHealthyThreshold, + cfg.KubeletHealthGracePeriod, + hc.setLastNodeLease, + hc.getLastNodeLease) + } + go hc.run(stopCh) + return hc, nil } -// IsHealthy returns the healthy stats of specified server -func (hcm *healthCheckerManager) IsHealthy(server *url.URL) bool { - if checker, ok := hcm.checkers[server.String()]; ok { - return checker.isHealthy() +func (hc *cloudAPIServerHealthChecker) RenewKubeletLeaseTime() { + currentTime := time.Now() + for _, prober := range hc.probers { + prober.RenewKubeletLeaseTime(currentTime) } - // If there is no checker for server, default unhealthy. - return false } -func (hcm *healthCheckerManager) UpdateLastKubeletLeaseReqTime(lastKubeletLeaseReqTime time.Time) { - hcm.Lock() - defer hcm.Unlock() - hcm.lastKubeletLeaseReqTime = lastKubeletLeaseReqTime -} +func (hc *cloudAPIServerHealthChecker) IsHealthy() bool { + for _, prober := range hc.probers { + if prober.IsHealthy() { + return true + } + } -func (hcm *healthCheckerManager) getLastKubeletLeaseReqTime() time.Time { - hcm.RLock() - defer hcm.RUnlock() - return hcm.lastKubeletLeaseReqTime + return false } -func newChecker( - cfg *config.YurtHubConfiguration, - tp transport.Interface, - url *url.URL, - setLastNodeLease setNodeLease, - getLastNodeLease getNodeLease, -) (*checker, error) { - restConf := &rest.Config{ - Host: url.String(), - Transport: tp.CurrentTransport(), - Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds) * time.Second, - } - kubeClient, err := clientset.NewForConfig(restConf) - if err != nil { - return nil, err - } - - nl := NewNodeLease(kubeClient, cfg.NodeName, defaultLeaseDurationSeconds, cfg.HeartbeatFailedRetry) - c := &checker{ - nodeLease: nl, - lastTime: time.Now(), - clusterHealthy: false, - healthyThreshold: cfg.HeartbeatHealthyThreshold, - healthyCnt: 0, - remoteServer: url, - onFailureFunc: tp.Close, - setLastNodeLease: setLastNodeLease, - getLastNodeLease: getLastNodeLease, +// BackendHealthyStatus returns the healthy stats of specified server +func (hc *cloudAPIServerHealthChecker) BackendHealthyStatus(server *url.URL) bool { + if prober, ok := hc.probers[server.String()]; ok { + return prober.IsHealthy() } - return c, nil + // If there is no checker for server, default unhealthy. + return false } -func (c *checker) check() bool { - baseLease := c.getLastNodeLease() - lease, err := c.nodeLease.Update(baseLease) - if err == nil { - if err := c.setLastNodeLease(lease); err != nil { - klog.Errorf("set last node lease fail: %v", err) - } - c.healthyCnt++ - if !c.isHealthy() && c.healthyCnt >= c.healthyThreshold { - c.setHealthy(true) - now := time.Now() - klog.Infof("cluster becomes healthy from %v, unhealthy status lasts %v, remote server: %v", now, now.Sub(c.lastTime), c.remoteServer.String()) - c.lastTime = now - metrics.Metrics.ObserveServerHealthy(c.remoteServer.Host, 1) +func (hc *cloudAPIServerHealthChecker) run(stopCh <-chan struct{}) { + intervalTicker := time.NewTicker(time.Duration(hc.heartbeatInterval) * time.Second) + defer intervalTicker.Stop() + + for { + select { + case <-stopCh: + klog.Infof("exit normally in health check loop.") + return + case <-intervalTicker.C: + // Ensure that the node heartbeat can be reported when there is a healthy remote server. + // Try to detect all remote server in a loop, if there is a remote server can update nodeLease, exit the loop. + for i := 0; i < len(hc.remoteServers); i++ { + p := hc.getProber() + if p.Probe(ProbePhaseNormal) { + break + } + } } - return true } - - klog.Errorf("failed to update lease: %v, remote server %s", err, c.remoteServer.String()) - c.markAsUnhealthy() - return false } -func (c *checker) markAsUnhealthy() { - if c.onFailureFunc != nil { - c.onFailureFunc(c.remoteServer.Host) - } - c.healthyCnt = 0 - if c.isHealthy() { - c.setHealthy(false) - now := time.Now() - klog.Infof("cluster becomes unhealthy from %v, healthy status lasts %v, remote server: %v", time.Now(), now.Sub(c.lastTime), c.remoteServer.String()) - c.lastTime = now - metrics.Metrics.ObserveServerHealthy(c.remoteServer.Host, 0) +func (hc *cloudAPIServerHealthChecker) setLastNodeLease(lease *coordinationv1.Lease) error { + if lease == nil { + return nil } + hc.latestLease = lease + + accessor := meta.NewAccessor() + accessor.SetKind(lease, coordinationv1.SchemeGroupVersion.WithKind("Lease").Kind) + accessor.SetAPIVersion(lease, coordinationv1.SchemeGroupVersion.String()) + cacheLeaseKey := fmt.Sprintf(cacheLeaseKeyFormat, lease.Name) + return hc.sw.Update(cacheLeaseKey, lease) } -func (c *checker) isHealthy() bool { - c.RLock() - defer c.RUnlock() - return c.clusterHealthy +func (hc *cloudAPIServerHealthChecker) getLastNodeLease() *coordinationv1.Lease { + return hc.latestLease } -func (c *checker) setHealthy(healthy bool) { - c.Lock() - defer c.Unlock() - c.clusterHealthy = healthy +func (hc *cloudAPIServerHealthChecker) getProber() BackendProber { + prober := hc.probers[hc.remoteServers[hc.remoteServerIndex].String()] + hc.remoteServerIndex = (hc.remoteServerIndex + 1) % len(hc.remoteServers) + return prober } diff --git a/pkg/yurthub/healthchecker/health_checker_test.go b/pkg/yurthub/healthchecker/health_checker_test.go index 4f784d0aea8..85659c32b89 100644 --- a/pkg/yurthub/healthchecker/health_checker_test.go +++ b/pkg/yurthub/healthchecker/health_checker_test.go @@ -29,10 +29,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" clientfake "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" - "k8s.io/klog/v2" + "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" ) @@ -41,7 +42,185 @@ var ( rootDir = "/tmp/healthz" ) -func TestHealthyCheckrWithHealthyServer(t *testing.T) { +type fakeMultipleBackendsHealthChecker struct { + status bool +} + +func (fakeChecker *fakeMultipleBackendsHealthChecker) IsHealthy() bool { + return fakeChecker.status +} + +func (fakeChecker *fakeMultipleBackendsHealthChecker) RenewKubeletLeaseTime() { + // do nothing +} + +func (fakeChecker *fakeMultipleBackendsHealthChecker) BackendHealthyStatus(*url.URL) bool { + return fakeChecker.status +} + +func TestNewCoordinatorHealthChecker(t *testing.T) { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + UID: types.UID("foo-uid"), + }, + } + lease := &coordinationv1.Lease{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "coordination.k8s.io/v1", + Kind: "Lease", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "kube-node-lease", + ResourceVersion: "115883910", + }, + } + var delegateLease *coordinationv1.Lease + gr := schema.GroupResource{Group: "v1", Resource: "lease"} + testcases := map[string]struct { + cloudAPIServerUnhealthy bool + createReactor clienttesting.ReactionFunc + updateReactor clienttesting.ReactionFunc + getReactor clienttesting.ReactionFunc + initHealthy bool + probeHealthy bool + }{ + "both init and probe healthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + updateReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + initHealthy: true, + probeHealthy: true, + }, + "init healthy and probe unhealthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + updateReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewServerTimeout(gr, "update", 1) + }, + getReactor: func() func(action clienttesting.Action) (bool, runtime.Object, error) { + i := 0 + return func(action clienttesting.Action) (bool, runtime.Object, error) { + i++ + switch i { + case 1: + return true, nil, apierrors.NewNotFound(gr, "not found") + default: + return true, lease, nil + } + } + }(), + initHealthy: true, + probeHealthy: false, + }, + "init unhealthy and probe unhealthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewServerTimeout(gr, "create", 1) + }, + updateReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewServerTimeout(gr, "update", 1) + }, + getReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewServerTimeout(gr, "get", 1) + }, + initHealthy: false, + probeHealthy: false, + }, + "init unhealthy and probe healthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewServerTimeout(gr, "create", 1) + }, + updateReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + getReactor: func() func(action clienttesting.Action) (bool, runtime.Object, error) { + i := 0 + return func(action clienttesting.Action) (bool, runtime.Object, error) { + i++ + switch { + case i <= 4: + return true, nil, apierrors.NewNotFound(gr, "not found") + default: + return true, lease, nil + } + } + }(), + initHealthy: false, + probeHealthy: true, + }, + "cloud apiserver checker is unhealthy": { + cloudAPIServerUnhealthy: true, + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + updateReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + if updateAction, ok := action.(clienttesting.UpdateAction); ok { + delegateLease, _ = updateAction.GetObject().(*coordinationv1.Lease) + return true, updateAction.GetObject(), nil + } + return true, nil, apierrors.NewServerTimeout(gr, "update", 1) + }, + getReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + initHealthy: true, + probeHealthy: true, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + cfg := &config.YurtHubConfiguration{ + CoordinatorServer: &url.URL{Host: "127.0.0.1:18080"}, + NodeName: node.Name, + HeartbeatFailedRetry: 2, + HeartbeatHealthyThreshold: 1, + HeartbeatIntervalSeconds: 3, + KubeletHealthGracePeriod: 40, + } + + cl := clientfake.NewSimpleClientset(node) + cl.PrependReactor("create", "leases", tt.createReactor) + cl.PrependReactor("update", "leases", tt.updateReactor) + if tt.getReactor != nil { + cl.PrependReactor("get", "leases", tt.getReactor) + } + + cloudChecker := &fakeMultipleBackendsHealthChecker{status: !tt.cloudAPIServerUnhealthy} + stopCh := make(chan struct{}) + checker, _ := NewCoordinatorHealthChecker(cfg, cl, cloudChecker, stopCh) + + initHealthy := checker.IsHealthy() + if initHealthy != tt.initHealthy { + t.Errorf("new coordinator health checker, expect init healthy %v, but got %v", tt.initHealthy, initHealthy) + } + + // wait for the probe completed + time.Sleep(5 * time.Second) + probeHealthy := checker.IsHealthy() + if probeHealthy != tt.probeHealthy { + t.Errorf("after probe, expect probe healthy %v, but got %v", tt.probeHealthy, probeHealthy) + } + + if tt.cloudAPIServerUnhealthy { + if delegateLease == nil || len(delegateLease.Annotations) == 0 { + t.Errorf("expect delegate heartbeat annotaion, but got nil") + } else if v, ok := delegateLease.Annotations[delegateHeartBeat]; !ok || v != "true" { + t.Errorf("expect delegate heartbeat annotaion and v is true, but got empty or %v", v) + } + } + + close(stopCh) + }) + } +} + +func TestNewCloudAPIServerHealthChecker(t *testing.T) { node := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -63,52 +242,64 @@ func TestHealthyCheckrWithHealthyServer(t *testing.T) { gr := schema.GroupResource{Group: "v1", Resource: "lease"} noConnectionUpdateErr := apierrors.NewServerTimeout(gr, "put", 1) - cases := []struct { - desc string + testcases := map[string]struct { remoteServers []*url.URL - updateReactor []func(action clienttesting.Action) (bool, runtime.Object, error) - getReactor []func(action clienttesting.Action) (bool, runtime.Object, error) - isHealthy [][]bool + createReactor []clienttesting.ReactionFunc + updateReactor []clienttesting.ReactionFunc + getReactor []clienttesting.ReactionFunc + isHealthy []bool + serverHealthy bool }{ - { - desc: "healthy", + "init healthy for one server": { remoteServers: []*url.URL{ {Host: "127.0.0.1:18080"}, }, - updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + createReactor: []clienttesting.ReactionFunc{ func(action clienttesting.Action) (bool, runtime.Object, error) { return true, lease, nil }, }, - getReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){func(action clienttesting.Action) (bool, runtime.Object, error) { - return true, lease, nil - }}, - isHealthy: [][]bool{{true}}, + updateReactor: []clienttesting.ReactionFunc{ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + }, + getReactor: []clienttesting.ReactionFunc{ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewNotFound(gr, "not found") + }, + }, + isHealthy: []bool{true}, + serverHealthy: true, }, - { - desc: "unhealthy", + "init unhealthy for one server": { remoteServers: []*url.URL{ {Host: "127.0.0.1:18080"}, }, - updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + createReactor: []clienttesting.ReactionFunc{ func(action clienttesting.Action) (bool, runtime.Object, error) { return true, nil, noConnectionUpdateErr }, }, - getReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + updateReactor: []clienttesting.ReactionFunc{ func(action clienttesting.Action) (bool, runtime.Object, error) { return true, nil, noConnectionUpdateErr }, }, - isHealthy: [][]bool{{false}}, + getReactor: []clienttesting.ReactionFunc{ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewNotFound(gr, "not found") + }, + }, + isHealthy: []bool{false}, + serverHealthy: false, }, - { - desc: "two-healthy", + "both init and probe healthy for two servers": { remoteServers: []*url.URL{ {Host: "127.0.0.1:18080"}, {Host: "127.0.0.1:18081"}, }, - updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + createReactor: []clienttesting.ReactionFunc{ func(action clienttesting.Action) (bool, runtime.Object, error) { return true, lease, nil }, @@ -116,7 +307,7 @@ func TestHealthyCheckrWithHealthyServer(t *testing.T) { return true, lease, nil }, }, - getReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + updateReactor: []clienttesting.ReactionFunc{ func(action clienttesting.Action) (bool, runtime.Object, error) { return true, lease, nil }, @@ -124,125 +315,80 @@ func TestHealthyCheckrWithHealthyServer(t *testing.T) { return true, lease, nil }, }, - isHealthy: [][]bool{{true, true}}, + getReactor: []clienttesting.ReactionFunc{ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewNotFound(gr, "not found") + }, + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewNotFound(gr, "not found") + }, + }, + isHealthy: []bool{true, true}, + serverHealthy: true, }, - { - desc: "two-unhealthy", + "one healthy and the other unhealthy": { remoteServers: []*url.URL{ {Host: "127.0.0.1:18080"}, {Host: "127.0.0.1:18081"}, }, - updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + createReactor: []clienttesting.ReactionFunc{ func(action clienttesting.Action) (bool, runtime.Object, error) { - return true, nil, noConnectionUpdateErr + return true, lease, nil }, func(action clienttesting.Action) (bool, runtime.Object, error) { return true, nil, noConnectionUpdateErr }, }, - getReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + updateReactor: []clienttesting.ReactionFunc{ func(action clienttesting.Action) (bool, runtime.Object, error) { - return true, nil, noConnectionUpdateErr + return true, lease, nil }, func(action clienttesting.Action) (bool, runtime.Object, error) { return true, nil, noConnectionUpdateErr }, }, - isHealthy: [][]bool{{false, false}}, + getReactor: []clienttesting.ReactionFunc{ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewNotFound(gr, "not found") + }, + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewNotFound(gr, "not found") + }, + }, + isHealthy: []bool{true, false}, + serverHealthy: true, }, - { - desc: "one-healthy one-unhealthy", + "unhealthy two servers": { remoteServers: []*url.URL{ {Host: "127.0.0.1:18080"}, {Host: "127.0.0.1:18081"}, }, - updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + createReactor: []clienttesting.ReactionFunc{ func(action clienttesting.Action) (bool, runtime.Object, error) { return true, nil, noConnectionUpdateErr }, func(action clienttesting.Action) (bool, runtime.Object, error) { - return true, lease, nil + return true, nil, noConnectionUpdateErr }, }, - getReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + updateReactor: []clienttesting.ReactionFunc{ func(action clienttesting.Action) (bool, runtime.Object, error) { return true, nil, noConnectionUpdateErr }, func(action clienttesting.Action) (bool, runtime.Object, error) { - return true, lease, nil + return true, nil, noConnectionUpdateErr }, }, - isHealthy: [][]bool{{false, true}}, - }, - { - desc: "healthy to unhealthy", - remoteServers: []*url.URL{ - {Host: "127.0.0.1:18080"}, - }, - updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ - func() func(action clienttesting.Action) (bool, runtime.Object, error) { - i := 0 - return func(action clienttesting.Action) (bool, runtime.Object, error) { - i++ - switch i { - case 1: - return true, lease, nil - default: - return true, nil, noConnectionUpdateErr - } - } - }(), - }, - getReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ - func() func(action clienttesting.Action) (bool, runtime.Object, error) { - i := 0 - return func(action clienttesting.Action) (bool, runtime.Object, error) { - i++ - switch i { - case 1: - return true, lease, nil - default: - return true, nil, noConnectionUpdateErr - } - } - }(), - }, - isHealthy: [][]bool{{true}, {false}}, - }, - { - desc: "unhealthy to healthy", - remoteServers: []*url.URL{ - {Host: "127.0.0.1:18080"}, - }, - updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ - func() func(action clienttesting.Action) (bool, runtime.Object, error) { - i := 0 - return func(action clienttesting.Action) (bool, runtime.Object, error) { - i++ - switch i { - case 1: - return true, nil, noConnectionUpdateErr - default: - return true, lease, nil - } - } - }(), - }, - getReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ - func() func(action clienttesting.Action) (bool, runtime.Object, error) { - i := 0 - return func(action clienttesting.Action) (bool, runtime.Object, error) { - i++ - switch i { - case 1: - return true, nil, noConnectionUpdateErr - default: - return true, lease, nil - } - } - }(), + getReactor: []clienttesting.ReactionFunc{ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewNotFound(gr, "not found") + }, + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, apierrors.NewNotFound(gr, "not found") + }, }, - isHealthy: [][]bool{{false}, {true}}, + isHealthy: []bool{false, false}, + serverHealthy: false, }, } @@ -251,47 +397,41 @@ func TestHealthyCheckrWithHealthyServer(t *testing.T) { t.Errorf("failed to create disk storage, %v", err) } - for _, tc := range cases { - t.Run(tc.desc, func(t *testing.T) { + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { stopCh := make(chan struct{}) - hcm := &healthCheckerManager{ - checkers: make(map[string]*checker), - remoteServers: tc.remoteServers, - remoteServerIndex: 0, - sw: cachemanager.NewStorageWrapper(store), - stopCh: stopCh, + cfg := &config.YurtHubConfiguration{ + RemoteServers: tt.remoteServers, + StorageWrapper: cachemanager.NewStorageWrapper(store), + NodeName: node.Name, + HeartbeatFailedRetry: 2, + HeartbeatHealthyThreshold: 1, + HeartbeatIntervalSeconds: 3, + KubeletHealthGracePeriod: 40, } - for i, server := range tc.remoteServers { + fakeClients := make(map[string]kubernetes.Interface) + for i := range tt.remoteServers { cl := clientfake.NewSimpleClientset(node) - cl.PrependReactor("update", "leases", tc.updateReactor[i]) - cl.PrependReactor("get", "leases", tc.getReactor[i]) - cl.PrependReactor("create", "leases", tc.updateReactor[i]) - nl := NewNodeLease(cl, "foo", defaultLeaseDurationSeconds, 3) - c := &checker{ - remoteServer: server, - clusterHealthy: tc.isHealthy[0][i], - healthyThreshold: 2, - healthyCnt: 0, - nodeLease: nl, - lastTime: time.Now(), - getLastNodeLease: hcm.getLastNodeLease, - setLastNodeLease: hcm.setLastNodeLease, - } - hcm.checkers[server.String()] = c + cl.PrependReactor("create", "leases", tt.createReactor[i]) + cl.PrependReactor("update", "leases", tt.updateReactor[i]) + cl.PrependReactor("get", "leases", tt.getReactor[i]) + fakeClients[tt.remoteServers[i].String()] = cl } - hcm.Run() + checker, _ := NewCloudAPIServerHealthChecker(cfg, fakeClients, stopCh) - for i := range tc.isHealthy { - klog.Infof("begin sleep 16s: %v", time.Now()) - time.Sleep(16 * time.Second) - for j, server := range tc.remoteServers { - if hcm.IsHealthy(server) != tc.isHealthy[i][j] { - t.Fatalf("got %v, expected %v", hcm.IsHealthy(server), tc.isHealthy[i][j]) - } + // wait for the probe completed + time.Sleep(time.Duration(5*len(tt.remoteServers)) * time.Second) + + for i := range tt.remoteServers { + if checker.BackendHealthyStatus(tt.remoteServers[i]) != tt.isHealthy[i] { + t.Errorf("expect server %s healthy status %v, but got %v", tt.remoteServers[i].String(), tt.isHealthy[i], checker.BackendHealthyStatus(tt.remoteServers[i])) } } + if checker.IsHealthy() != tt.serverHealthy { + t.Errorf("expect all servers healthy status %v, but got %v", tt.serverHealthy, checker.IsHealthy()) + } close(stopCh) }) diff --git a/pkg/yurthub/healthchecker/interfaces.go b/pkg/yurthub/healthchecker/interfaces.go new file mode 100644 index 00000000000..18a9ca99db2 --- /dev/null +++ b/pkg/yurthub/healthchecker/interfaces.go @@ -0,0 +1,50 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "net/url" + "time" +) + +// HealthChecker is an interface for checking healthy status of one server +type HealthChecker interface { + // RenewKubeletLeaseTime is used for notifying whether kubelet stopped or not, + // when kubelet lease renew time is stopped to report, health checker will stop check + // the healthy status of remote server and mark remote server as unhealthy. + RenewKubeletLeaseTime() + IsHealthy() bool +} + +// MultipleBackendsHealthChecker is used for checking healthy status of multiple servers, +// like there are several kube-apiserver instances on the cloud for high availability. +type MultipleBackendsHealthChecker interface { + HealthChecker + BackendHealthyStatus(server *url.URL) bool +} + +// BackendProber is used to send heartbeat to backend and verify backend +// is healthy or not +type BackendProber interface { + // RenewKubeletLeaseTime is used for notifying whether kubelet stopped or not, + // when kubelet lease renew time is stopped to report, health checker will stop check + // the healthy status of remote server and mark remote server as unhealthy. + RenewKubeletLeaseTime(time time.Time) + // Probe send one heartbeat to backend and should be executed by caller in interval + Probe(phase string) bool + IsHealthy() bool +} diff --git a/pkg/yurthub/healthchecker/node_lease.go b/pkg/yurthub/healthchecker/node_lease.go index 8133e97697b..d1118fde50d 100644 --- a/pkg/yurthub/healthchecker/node_lease.go +++ b/pkg/yurthub/healthchecker/node_lease.go @@ -33,9 +33,8 @@ import ( ) const ( - maxBackoff = 1 * time.Second - defaultLeaseDurationSeconds int32 = 40 - cacheLeaseKeyFormat = "/kubelet/leases/kube-node-lease/%s" + maxBackoff = 1 * time.Second + cacheLeaseKeyFormat = "/kubelet/leases/kube-node-lease/%s" ) type NodeLease interface { diff --git a/pkg/yurthub/healthchecker/prober.go b/pkg/yurthub/healthchecker/prober.go new file mode 100644 index 00000000000..0ef8d5fdd73 --- /dev/null +++ b/pkg/yurthub/healthchecker/prober.go @@ -0,0 +1,161 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "sync" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/yurthub/metrics" +) + +const ( + ProbePhaseInit = "init" + ProbePhaseNormal = "normal" +) + +type prober struct { + sync.RWMutex + remoteServer string + clusterHealthy bool + healthyThreshold int + healthyCnt int + lastTime time.Time + lastRenewTime time.Time + healthCheckGracePeriod time.Duration + nodeLease NodeLease + getLastNodeLease getNodeLease + setLastNodeLease setNodeLease +} + +func newProber( + kubeClient kubernetes.Interface, + remoteServer string, + nodeName string, + heartbeatFailedRetry int, + heartbeatHealthyThreshold int, + healthCheckGracePeriod time.Duration, + setLastNodeLease setNodeLease, + getLastNodeLease getNodeLease, +) BackendProber { + nl := NewNodeLease(kubeClient, nodeName, int32(healthCheckGracePeriod.Seconds()), heartbeatFailedRetry) + p := &prober{ + nodeLease: nl, + lastTime: time.Now(), + clusterHealthy: false, + healthyThreshold: heartbeatHealthyThreshold, + healthCheckGracePeriod: healthCheckGracePeriod, + healthyCnt: 0, + remoteServer: remoteServer, + setLastNodeLease: setLastNodeLease, + getLastNodeLease: getLastNodeLease, + } + + p.Probe(ProbePhaseInit) + return p +} + +func (p *prober) RenewKubeletLeaseTime(renewTime time.Time) { + p.Lock() + defer p.Unlock() + p.lastRenewTime = renewTime +} + +func (p *prober) Probe(phase string) bool { + if p.kubeletStopped() { + p.markAsUnhealthy(phase) + return false + } + + baseLease := p.getLastNodeLease() + lease, err := p.nodeLease.Update(baseLease) + if err == nil { + if err := p.setLastNodeLease(lease); err != nil { + klog.Errorf("failed to store last node lease: %v", err) + } + p.markAsHealthy(phase) + return true + } + + klog.Errorf("failed to probe: %v, remote server %s", err, p.ServerName()) + p.markAsUnhealthy(phase) + return false +} + +func (p *prober) IsHealthy() bool { + p.RLock() + defer p.RUnlock() + return p.clusterHealthy +} + +func (p *prober) ServerName() string { + return p.remoteServer +} + +func (p *prober) kubeletStopped() bool { + p.Lock() + defer p.Unlock() + stopped := false + if !p.lastRenewTime.IsZero() && p.healthCheckGracePeriod > 0 { + stopped = time.Now().After(p.lastRenewTime.Add(p.healthCheckGracePeriod)) + } + + return stopped +} + +func (p *prober) setHealthy(healthy bool) { + p.Lock() + defer p.Unlock() + p.clusterHealthy = healthy +} + +func (p *prober) markAsHealthy(phase string) { + p.healthyCnt++ + if phase == ProbePhaseInit { + klog.Infof("healthy status of remote server %s in %s phase is healthy", p.ServerName(), phase) + p.setHealthy(true) + return + } + + if !p.IsHealthy() && p.healthyCnt >= p.healthyThreshold { + p.setHealthy(true) + now := time.Now() + klog.Infof("remote server %s becomes healthy from %v, unhealthy status lasts %v", p.ServerName(), now, now.Sub(p.lastTime)) + p.lastTime = now + metrics.Metrics.ObserveServerHealthy(p.ServerName(), 1) + } +} + +func (p *prober) markAsUnhealthy(phase string) { + p.healthyCnt = 0 + if phase == ProbePhaseInit { + klog.Infof("healthy status of remote server %s in %s phase is unhealthy", p.ServerName(), phase) + p.setHealthy(false) + return + } + + if p.IsHealthy() { + p.setHealthy(false) + now := time.Now() + klog.Infof("remote server %s becomes unhealthy from %v, healthy status lasts %v", p.ServerName(), time.Now(), now.Sub(p.lastTime)) + p.lastTime = now + metrics.Metrics.ObserveServerHealthy(p.ServerName(), 0) + } +} diff --git a/pkg/yurthub/healthchecker/prober_test.go b/pkg/yurthub/healthchecker/prober_test.go new file mode 100644 index 00000000000..3e32b6cb447 --- /dev/null +++ b/pkg/yurthub/healthchecker/prober_test.go @@ -0,0 +1,210 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthchecker + +import ( + "net/url" + "testing" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + clientfake "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" +) + +func TestIsHealthy(t *testing.T) { + var latestLease *coordinationv1.Lease + setLease := func(l *coordinationv1.Lease) error { + latestLease = l + return nil + } + getLease := func() *coordinationv1.Lease { + return latestLease + } + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + UID: types.UID("foo-uid"), + }, + } + + lease := &coordinationv1.Lease{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "coordination.k8s.io/v1", + Kind: "Lease", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "kube-node-lease", + ResourceVersion: "115883910", + }, + } + remoteServer := &url.URL{Host: "127.0.0.1:18080"} + + gr := schema.GroupResource{Group: "v1", Resource: "lease"} + noConnectionUpdateErr := apierrors.NewServerTimeout(gr, "put", 1) + + testcases := map[string]struct { + renewTime int + createReactor clienttesting.ReactionFunc + updateReactor []func(action clienttesting.Action) (bool, runtime.Object, error) + getReactor func(action clienttesting.Action) (bool, runtime.Object, error) + initHealthy bool + probeHealthy []bool + isHealthy []bool + }{ + "init probe healthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + initHealthy: true, + }, + "init probe unhealthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, noConnectionUpdateErr + }, + initHealthy: false, + }, + "normal probe from healthy to healthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + }, + initHealthy: true, + probeHealthy: []bool{true}, + isHealthy: []bool{true}, + }, + "normal probe from healthy to unhealthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, noConnectionUpdateErr + }, + }, + getReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + initHealthy: true, + probeHealthy: []bool{false}, + isHealthy: []bool{false}, + }, + "normal probe from unhealthy to unhealthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, noConnectionUpdateErr + }, + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, noConnectionUpdateErr + }, + }, + getReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + initHealthy: true, + probeHealthy: []bool{false, false}, + isHealthy: []bool{false, false}, + }, + "normal probe from unhealthy to healthy": { + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, noConnectionUpdateErr + }, + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + }, + getReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + initHealthy: true, + probeHealthy: []bool{false, true, true}, + isHealthy: []bool{false, false, true}, + }, + "kubelet stopped": { + renewTime: -60, + createReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + updateReactor: []func(action clienttesting.Action) (bool, runtime.Object, error){ + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, noConnectionUpdateErr + }, + func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + }, + getReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, lease, nil + }, + initHealthy: true, + probeHealthy: []bool{false, false}, + isHealthy: []bool{false, false}, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t2 *testing.T) { + cl := clientfake.NewSimpleClientset(node) + cl.PrependReactor("create", "leases", tt.createReactor) + prober := newProber(cl, remoteServer.String(), node.Name, 2, 2, 40*time.Second, setLease, getLease) + if prober.IsHealthy() != tt.initHealthy { + t.Errorf("expect server init healthy %v, but got %v", tt.initHealthy, prober.IsHealthy()) + } + + if tt.renewTime != 0 { + prober.RenewKubeletLeaseTime(time.Now().Add(time.Duration(tt.renewTime) * time.Second)) + } + + if tt.getReactor != nil { + cl.PrependReactor("get", "leases", tt.getReactor) + } + for i := range tt.updateReactor { + cl.PrependReactor("update", "leases", tt.updateReactor[i]) + probeHealthy := prober.Probe(ProbePhaseNormal) + if probeHealthy != tt.probeHealthy[i] { + t.Errorf("expect probe return value should be %v, but got %v", tt.probeHealthy[i], probeHealthy) + } + + if prober.IsHealthy() != tt.isHealthy[i] { + t.Errorf("expect server healthy should be %v after probe, but got %v", tt.isHealthy[i], prober.IsHealthy()) + } + } + }) + } +} diff --git a/pkg/yurthub/kubernetes/rest/config.go b/pkg/yurthub/kubernetes/rest/config.go index 49330ea3ded..d4b5fe01aa4 100644 --- a/pkg/yurthub/kubernetes/rest/config.go +++ b/pkg/yurthub/kubernetes/rest/config.go @@ -31,12 +31,12 @@ import ( type RestConfigManager struct { remoteServers []*url.URL - checker healthchecker.HealthChecker + checker healthchecker.MultipleBackendsHealthChecker certManager interfaces.YurtCertificateManager } // NewRestConfigManager creates a *RestConfigManager object -func NewRestConfigManager(cfg *config.YurtHubConfiguration, certMgr interfaces.YurtCertificateManager, healthChecker healthchecker.HealthChecker) (*RestConfigManager, error) { +func NewRestConfigManager(cfg *config.YurtHubConfiguration, certMgr interfaces.YurtCertificateManager, healthChecker healthchecker.MultipleBackendsHealthChecker) (*RestConfigManager, error) { mgr := &RestConfigManager{ remoteServers: cfg.RemoteServers, checker: healthChecker, @@ -89,7 +89,7 @@ func (rcm *RestConfigManager) getHubselfRestConfig(needHealthyServer bool) *rest // getHealthyServer is used to get a healthy server func (rcm *RestConfigManager) getHealthyServer() *url.URL { for _, server := range rcm.remoteServers { - if rcm.checker.IsHealthy(server) { + if rcm.checker.BackendHealthyStatus(server) { return server } } diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index cd87df49b4f..203bffd7024 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -18,7 +18,6 @@ package proxy import ( "net/http" - "time" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/filters" @@ -28,7 +27,6 @@ import ( "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" - "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/local" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" @@ -55,8 +53,7 @@ func NewYurtReverseProxyHandler( yurtHubCfg *config.YurtHubConfiguration, cacheMgr cachemanager.CacheManager, transportMgr transport.Interface, - healthChecker healthchecker.HealthChecker, - certManager interfaces.YurtCertificateManager, + healthChecker healthchecker.MultipleBackendsHealthChecker, tenantMgr tenant.Interface, stopCh <-chan struct{}) (http.Handler, error) { cfg := &server.Config{ @@ -70,7 +67,6 @@ func NewYurtReverseProxyHandler( cacheMgr, transportMgr, healthChecker, - certManager, yurtHubCfg.FilterManager, stopCh) if err != nil { @@ -81,7 +77,7 @@ func NewYurtReverseProxyHandler( // When yurthub is working in cloud mode, cacheMgr will be set to nil which means the local cache is disabled, // so we don't need to create a LocalProxy. if cacheMgr != nil { - localProxy = local.NewLocalProxy(cacheMgr, lb.IsHealthy) + localProxy = local.NewLocalProxy(cacheMgr, healthChecker.IsHealthy) localProxy = local.WithFakeTokenInject(localProxy, yurtHubCfg.SerializerManager) } @@ -126,11 +122,11 @@ func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { isKubeletLeaseReq := hubutil.IsKubeletLeaseReq(req) - if !isKubeletLeaseReq && p.loadBalancer.IsHealthy() || p.localProxy == nil { + if !isKubeletLeaseReq && p.checker.IsHealthy() || p.localProxy == nil { p.loadBalancer.ServeHTTP(rw, req) } else { if isKubeletLeaseReq { - p.checker.UpdateLastKubeletLeaseReqTime(time.Now()) + p.checker.RenewKubeletLeaseTime() } p.localProxy.ServeHTTP(rw, req) } diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index c51952dbc43..8158ef3b427 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -25,7 +25,6 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" - "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/transport" @@ -39,6 +38,7 @@ type loadBalancerAlgo interface { type rrLoadBalancerAlgo struct { sync.Mutex + checker healthchecker.MultipleBackendsHealthChecker backends []*RemoteProxy next int } @@ -51,7 +51,7 @@ func (rr *rrLoadBalancerAlgo) PickOne() *RemoteProxy { if len(rr.backends) == 0 { return nil } else if len(rr.backends) == 1 { - if rr.backends[0].IsHealthy() { + if rr.checker.BackendHealthyStatus(rr.backends[0].RemoteServer()) { return rr.backends[0] } return nil @@ -63,7 +63,7 @@ func (rr *rrLoadBalancerAlgo) PickOne() *RemoteProxy { selected := rr.next for i := 0; i < len(rr.backends); i++ { selected = (rr.next + i) % len(rr.backends) - if rr.backends[selected].IsHealthy() { + if rr.checker.BackendHealthyStatus(rr.backends[selected].RemoteServer()) { hasFound = true break } @@ -80,6 +80,7 @@ func (rr *rrLoadBalancerAlgo) PickOne() *RemoteProxy { type priorityLoadBalancerAlgo struct { sync.Mutex + checker healthchecker.MultipleBackendsHealthChecker backends []*RemoteProxy } @@ -91,7 +92,7 @@ func (prio *priorityLoadBalancerAlgo) PickOne() *RemoteProxy { if len(prio.backends) == 0 { return nil } else if len(prio.backends) == 1 { - if prio.backends[0].IsHealthy() { + if prio.checker.BackendHealthyStatus(prio.backends[0].RemoteServer()) { return prio.backends[0] } return nil @@ -99,7 +100,7 @@ func (prio *priorityLoadBalancerAlgo) PickOne() *RemoteProxy { prio.Lock() defer prio.Unlock() for i := 0; i < len(prio.backends); i++ { - if prio.backends[i].IsHealthy() { + if prio.checker.BackendHealthyStatus(prio.backends[i].RemoteServer()) { return prio.backends[i] } } @@ -111,14 +112,12 @@ func (prio *priorityLoadBalancerAlgo) PickOne() *RemoteProxy { // LoadBalancer is an interface for proxying http request to remote server // based on the load balance mode(round-robin or priority) type LoadBalancer interface { - IsHealthy() bool ServeHTTP(rw http.ResponseWriter, req *http.Request) } type loadBalancer struct { - backends []*RemoteProxy - algo loadBalancerAlgo - certManager interfaces.YurtCertificateManager + backends []*RemoteProxy + algo loadBalancerAlgo } // NewLoadBalancer creates a loadbalancer for specified remote servers @@ -127,13 +126,12 @@ func NewLoadBalancer( remoteServers []*url.URL, cacheMgr cachemanager.CacheManager, transportMgr transport.Interface, - healthChecker healthchecker.HealthChecker, - certManager interfaces.YurtCertificateManager, + healthChecker healthchecker.MultipleBackendsHealthChecker, filterManager *manager.Manager, stopCh <-chan struct{}) (LoadBalancer, error) { backends := make([]*RemoteProxy, 0, len(remoteServers)) for i := range remoteServers { - b, err := NewRemoteProxy(remoteServers[i], cacheMgr, transportMgr, healthChecker, filterManager, stopCh) + b, err := NewRemoteProxy(remoteServers[i], cacheMgr, transportMgr, filterManager, stopCh) if err != nil { klog.Errorf("could not new proxy backend(%s), %v", remoteServers[i].String(), err) continue @@ -147,35 +145,19 @@ func NewLoadBalancer( var algo loadBalancerAlgo switch lbMode { case "rr": - algo = &rrLoadBalancerAlgo{backends: backends} + algo = &rrLoadBalancerAlgo{backends: backends, checker: healthChecker} case "priority": - algo = &priorityLoadBalancerAlgo{backends: backends} + algo = &priorityLoadBalancerAlgo{backends: backends, checker: healthChecker} default: - algo = &rrLoadBalancerAlgo{backends: backends} + algo = &rrLoadBalancerAlgo{backends: backends, checker: healthChecker} } return &loadBalancer{ - backends: backends, - algo: algo, - certManager: certManager, + backends: backends, + algo: algo, }, nil } -func (lb *loadBalancer) IsHealthy() bool { - // both certificate is not expired and - // have at least one healthy remote server, - // load balancer can proxy the request to - // remote server - if lb.certManager.NotExpired() { - for i := range lb.backends { - if lb.backends[i].IsHealthy() { - return true - } - } - } - return false -} - func (lb *loadBalancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { // pick a remote proxy based on the load balancing algorithm. rp := lb.algo.PickOne() diff --git a/pkg/yurthub/proxy/remote/loadbalancer_test.go b/pkg/yurthub/proxy/remote/loadbalancer_test.go index ae0b093fb75..ceff84da8d4 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer_test.go +++ b/pkg/yurthub/proxy/remote/loadbalancer_test.go @@ -71,12 +71,12 @@ func TestRrLoadBalancerAlgo(t *testing.T) { u, _ := url.Parse(tc.Servers[i]) backends[i] = &RemoteProxy{ remoteServer: u, - checker: checker, } } rr := &rrLoadBalancerAlgo{ backends: backends, + checker: checker, } for i := range tc.PickBackends { @@ -132,12 +132,12 @@ func TestRrLoadBalancerAlgoWithReverseHealthy(t *testing.T) { u, _ := url.Parse(tc.Servers[i]) backends[i] = &RemoteProxy{ remoteServer: u, - checker: checker, } } rr := &rrLoadBalancerAlgo{ backends: backends, + checker: checker, } for i := range tc.PickBackends { @@ -204,12 +204,12 @@ func TestPriorityLoadBalancerAlgo(t *testing.T) { u, _ := url.Parse(tc.Servers[i]) backends[i] = &RemoteProxy{ remoteServer: u, - checker: checker, } } rr := &priorityLoadBalancerAlgo{ backends: backends, + checker: checker, } for i := range tc.PickBackends { @@ -263,12 +263,12 @@ func TestPriorityLoadBalancerAlgoWithReverseHealthy(t *testing.T) { u, _ := url.Parse(tc.Servers[i]) backends[i] = &RemoteProxy{ remoteServer: u, - checker: checker, } } rr := &priorityLoadBalancerAlgo{ backends: backends, + checker: checker, } for i := range tc.PickBackends { diff --git a/pkg/yurthub/proxy/remote/remote.go b/pkg/yurthub/proxy/remote/remote.go index 51ab6d386d1..d8cb8d45afe 100644 --- a/pkg/yurthub/proxy/remote/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -34,14 +34,12 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/transport" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) // RemoteProxy is an reverse proxy for remote server type RemoteProxy struct { - checker healthchecker.HealthChecker reverseProxy *httputil.ReverseProxy cacheMgr cachemanager.CacheManager remoteServer *url.URL @@ -64,7 +62,6 @@ func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { func NewRemoteProxy(remoteServer *url.URL, cacheMgr cachemanager.CacheManager, transportMgr transport.Interface, - healthChecker healthchecker.HealthChecker, filterManager *manager.Manager, stopCh <-chan struct{}) (*RemoteProxy, error) { currentTransport := transportMgr.CurrentTransport() @@ -82,7 +79,6 @@ func NewRemoteProxy(remoteServer *url.URL, bearerUpgradeAwareHandler.UseRequestLocation = true proxyBackend := &RemoteProxy{ - checker: healthChecker, reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer), cacheMgr: cacheMgr, remoteServer: remoteServer, @@ -107,6 +103,10 @@ func (rp *RemoteProxy) Name() string { return rp.remoteServer.String() } +func (rp *RemoteProxy) RemoteServer() *url.URL { + return rp.remoteServer +} + func (rp *RemoteProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { if httpstream.IsUpgradeRequest(req) { klog.V(5).Infof("get upgrade request %s", req.URL) @@ -121,11 +121,6 @@ func (rp *RemoteProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { rp.reverseProxy.ServeHTTP(rw, req) } -// IsHealthy returns healthy status of remote server -func (rp *RemoteProxy) IsHealthy() bool { - return rp.checker.IsHealthy(rp.remoteServer) -} - func (rp *RemoteProxy) modifyResponse(resp *http.Response) error { if resp == nil || resp.Request == nil { klog.Infof("no request info in response, skip cache response")