From d457d131b0f4d2b33f9dabf66b1e3944a74082ea Mon Sep 17 00:00:00 2001 From: hxcGit Date: Thu, 29 Sep 2022 12:39:16 +0800 Subject: [PATCH] fix cloud-edge connection judgment Signed-off-by: hxcGit --- cmd/yurthub/app/start.go | 2 +- pkg/yurthub/otaupdate/ota.go | 63 ++++++++++++++----------------- pkg/yurthub/otaupdate/ota_test.go | 31 ++------------- pkg/yurthub/server/server.go | 12 +++--- 4 files changed, 38 insertions(+), 70 deletions(-) diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 3430446ea15..13025dfe7f3 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -195,7 +195,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { cfg.YurtSharedFactory.Start(stopCh) klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr) - s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr, healthChecker) + s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr) if err != nil { return fmt.Errorf("could not create hub server, %w", err) } diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index 01ef27a3de7..32227738b3e 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/http" - "net/url" "github.com/gorilla/mux" corev1 "k8s.io/api/core/v1" @@ -32,22 +31,16 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ) +type OTAHandler func(kubernetes.Interface, string) http.Handler + // GetPods return pod list -func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker, - servers []*url.URL) http.Handler { +func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Pre-check if edge yurthub node is connected to the cloud - if !isEdgeCloudConnected(checker, servers) { - klog.Errorf("Get pod list is not allowed when edge is disconnected to cloud") - WriteErr(w, "Get pod list is not allowed when edge is disconnected to cloud", http.StatusForbidden) - return - } - podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ - FieldSelector: "spec.nodeName=" + nodeName, + FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), }) if err != nil { klog.Errorf("Get pod list failed, %v", err) @@ -67,16 +60,8 @@ func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchec } // UpdatePod update a specifc pod(namespace/podname) to the latest version -func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker, - servers []*url.URL) http.Handler { +func UpdatePod(clientset kubernetes.Interface, nodeName string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Pre-check if edge yurthub node is connected to the cloud - if !isEdgeCloudConnected(checker, servers) { - klog.Errorf("Apply update is not allowed when edge is disconnected to cloud") - WriteErr(w, "Apply update is not allowed when edge is disconnected to cloud", http.StatusForbidden) - return - } - params := mux.Vars(r) namespace := params["ns"] podName := params["podname"] @@ -94,7 +79,7 @@ func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthch } // Successfully apply update, response 200 - WriteJSONResponse(w, []byte(fmt.Sprintf("Start updating pod %q/%q", namespace, podName))) + WriteJSONResponse(w, []byte(fmt.Sprintf("Start updating pod %v/%v", namespace, podName))) }) } @@ -131,7 +116,7 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st return err, false } - klog.Infof("Start updating pod: %q/%q", namespace, podName) + klog.Infof("Start updating pod: %v/%v", namespace, podName) return nil, true } @@ -141,17 +126,6 @@ func encodePodList(podList *corev1.PodList) ([]byte, error) { return runtime.Encode(codec, podList) } -// isEdgeCloudConnected will check if edge is disconnected to cloud. If there is any remote server is healthy, it is -// regarded as connected. Otherwise, it is regarded as disconnected and return false. -func isEdgeCloudConnected(checker healthchecker.HealthChecker, remoteServers []*url.URL) bool { - for _, server := range remoteServers { - if checker.IsHealthy(server) { - return true - } - } - return false -} - // WriteErr writes the http status and the error string on the response func WriteErr(w http.ResponseWriter, errReason string, httpStatus int) { w.WriteHeader(httpStatus) @@ -175,3 +149,24 @@ func WriteJSONResponse(w http.ResponseWriter, data []byte) { klog.Errorf("Write resp for request, expect %d bytes but write %d bytes with error, %v", len(data), n, err) } } + +// HealthyCheck checks if cloud-edge is disconnected before ota update handle, ota update is not allowed when disconnected +func HealthyCheck(rest *rest.RestConfigManager, nodeName string, handler OTAHandler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + restCfg := rest.GetRestConfig(true) + if restCfg == nil { + klog.Infof("Get pod list is not allowed when edge is disconnected to cloud") + WriteErr(w, "OTA update is not allowed when edge is disconnected to cloud", http.StatusForbidden) + return + } + + clientSet, err := kubernetes.NewForConfig(restCfg) + if err != nil { + klog.Errorf("Get client set failed: %v", err) + WriteErr(w, "Get client set failed", http.StatusInternalServerError) + return + } + + handler(clientSet, nodeName).ServeHTTP(w, r) + }) +} diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index 29a6084b3df..71ddd0ba8f3 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -19,7 +19,6 @@ package otaupdate import ( "net/http" "net/http/httptest" - "net/url" "testing" "github.com/gorilla/mux" @@ -29,25 +28,6 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" -) - -var ( - healthyServers = []*url.URL{ - {Host: "127.0.0.1:18080"}, - } - - unHealthyServers = []*url.URL{ - {Host: "127.0.0.1:18081"}, - } - - healthyFakeChecker = healthchecker.NewFakeChecker(true, map[string]int{ - "http://127.0.0.1:8080": 1, - }) - - unHealthyFakeChecker = healthchecker.NewFakeChecker(false, map[string]int{ - "http://127.0.0.1:8081": 1, - }) ) func newPod(podName string) *corev1.Pod { @@ -93,15 +73,10 @@ func TestGetPods(t *testing.T) { } rr := httptest.NewRecorder() - GetPods(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req) + GetPods(clientset, "").ServeHTTP(rr, req) expectedCode := http.StatusOK assert.Equal(t, expectedCode, rr.Code) - - // Cloud-Edge network disconnected - rr = httptest.NewRecorder() - GetPods(clientset, "", unHealthyFakeChecker, unHealthyServers).ServeHTTP(rr, req) - assert.Equal(t, http.StatusForbidden, rr.Code) } func TestUpdatePod(t *testing.T) { @@ -117,7 +92,7 @@ func TestUpdatePod(t *testing.T) { podName: "updatablePod", pod: newPodWithCondition("updatablePod", corev1.ConditionTrue), expectedCode: http.StatusOK, - expectedData: "Start updating pod \"default\"/\"updatablePod\"", + expectedData: "Start updating pod default/updatablePod", }, { reqURL: "/openyurt.io/v1/namespaces/default/pods/notUpdatablePod/update", @@ -148,7 +123,7 @@ func TestUpdatePod(t *testing.T) { req = mux.SetURLVars(req, vars) rr := httptest.NewRecorder() - UpdatePod(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req) + UpdatePod(clientset, "").ServeHTTP(rr, req) assert.Equal(t, test.expectedCode, rr.Code) assert.Equal(t, test.expectedData, rr.Body.String()) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 104a1206226..7da09cbd23a 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -37,7 +37,6 @@ import ( "github.com/openyurtio/openyurt/pkg/util/certmanager" certfactory "github.com/openyurtio/openyurt/pkg/util/certmanager/factory" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" - "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate" ) @@ -62,8 +61,7 @@ type yurtHubServer struct { func NewYurtHubServer(cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, proxyHandler http.Handler, - rest *rest.RestConfigManager, - checker healthchecker.HealthChecker) (Server, error) { + rest *rest.RestConfigManager) (Server, error) { hubMux := mux.NewRouter() restCfg := rest.GetRestConfig(false) clientSet, err := kubernetes.NewForConfig(restCfg) @@ -71,7 +69,7 @@ func NewYurtHubServer(cfg *config.YurtHubConfiguration, klog.Errorf("cannot create the client set: %v", err) return nil, err } - registerHandlers(hubMux, cfg, certificateMgr, clientSet, checker) + registerHandlers(hubMux, cfg, certificateMgr, rest) hubServer := &http.Server{ Addr: cfg.YurtHubServerAddr, Handler: hubMux, @@ -160,7 +158,7 @@ func (s *yurtHubServer) Run() { // registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token. func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, - clientset *kubernetes.Clientset, checker healthchecker.HealthChecker) { + rest *rest.RestConfigManager) { // register handlers for update join token c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT") @@ -176,9 +174,9 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica c.Handle("/metrics", promhttp.Handler()) // register handler for ota upgrade - c.Handle("/pods", ota.GetPods(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("GET") + c.Handle("/pods", ota.HealthyCheck(rest, cfg.NodeName, ota.GetPods)).Methods("GET") c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", - ota.UpdatePod(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("POST") + ota.HealthyCheck(rest, cfg.NodeName, ota.UpdatePod)).Methods("POST") } // healthz returns ok for healthz request