From 70107d71ad2aab45967cd3cc5ea33cd410d371e0 Mon Sep 17 00:00:00 2001 From: hxcGit Date: Tue, 27 Sep 2022 16:59:55 +0800 Subject: [PATCH] forbidden ota upgrade when cloud-edge is disconnected Signed-off-by: hxcGit --- cmd/yurthub/app/start.go | 2 +- pkg/yurthub/otaupdate/ota.go | 50 +++++++++++++++++++------------ pkg/yurthub/otaupdate/ota_test.go | 50 +++++++++++++++++++++++-------- pkg/yurthub/server/server.go | 14 +++++---- 4 files changed, 79 insertions(+), 37 deletions(-) diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 13025dfe7f3..3430446ea15 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -195,7 +195,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { cfg.YurtSharedFactory.Start(stopCh) klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr) - s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr) + s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr, healthChecker) if err != nil { return fmt.Errorf("could not create hub server, %w", err) } diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index 3b8f3bcfb96..8ed47ee0b02 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -21,22 +21,29 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "github.com/gorilla/mux" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" -) -// TODO(hxc): should use pkg/controller/daemonpodupdater.PodNeedUpgrade -const ( - PodNeedUpgrade corev1.PodConditionType = "PodNeedUpgrade" + "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" ) // GetPods return pod list -func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { +func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker, + servers []*url.URL) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Pre-check if edge yurthub node is connected to the cloud + if !isEdgeCloudConnected(checker, servers) { + klog.Errorf("Get pod list is not allowed when edge is disconnected to cloud") + returnErr(fmt.Errorf("Get pod list is not allowed when edge is disconnected to cloud"), + w, http.StatusForbidden) + return + } + podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ FieldSelector: "spec.nodeName=" + nodeName, }) @@ -52,7 +59,8 @@ func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { data, err := json.Marshal(podList) if err != nil { klog.Errorf("Marshal pod list failed: %v", err.Error()) - returnErr(fmt.Errorf("Get pod list failed: data transfer to json format failed."), w, http.StatusInternalServerError) + returnErr(fmt.Errorf("Get pod list failed: data transfer to json format failed."), + w, http.StatusInternalServerError) return } @@ -65,8 +73,17 @@ func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler { } // UpdatePod update a specifc pod(namespace/podname) to the latest version -func UpdatePod(clientset kubernetes.Interface, nodeName string) http.Handler { +func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker, + servers []*url.URL) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Pre-check if edge yurthub node is connected to the cloud + if !isEdgeCloudConnected(checker, servers) { + klog.Errorf("Apply update is not allowed when edge is disconnected to cloud") + returnErr(fmt.Errorf("Apply update is not allowed when edge is disconnected to cloud"), + w, http.StatusForbidden) + return + } + params := mux.Vars(r) namespace := params["ns"] podName := params["podname"] @@ -105,7 +122,7 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st } // Pod will not be updated without pod condition PodNeedUpgrade=true - if !IsPodUpdatable(pod) { + if !daemonpodupdater.IsPodUpdatable(pod) { klog.Infof("Pod: %v/%v is not updatable", namespace, podName) return nil, false } @@ -131,18 +148,13 @@ func returnErr(err error, w http.ResponseWriter, errType int) { } } -// TODO(hxc): should use pkg/controller/daemonpodupdater.IsPodUpdatable() -// IsPodUpdatable returns true if a pod is updatable; false otherwise. -func IsPodUpdatable(pod *corev1.Pod) bool { - if &pod.Status == nil || len(pod.Status.Conditions) == 0 { - return false - } - - for i := range pod.Status.Conditions { - if pod.Status.Conditions[i].Type == PodNeedUpgrade && pod.Status.Conditions[i].Status == "true" { +// isEdgeCloudConnected will check if edge is disconnected to cloud. If there is any remote server is healthy, it is +// regarded as connected. Otherwise, it is regarded as disconnected and return false. +func isEdgeCloudConnected(checker healthchecker.HealthChecker, remoteServers []*url.URL) bool { + for _, server := range remoteServers { + if checker.IsHealthy(server) { return true } } - return false } diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index d53e4e04a28..85ceeaadddc 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -19,7 +19,7 @@ package otaupdate import ( "net/http" "net/http/httptest" - "strconv" + "net/url" "testing" "github.com/gorilla/mux" @@ -27,6 +27,27 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + + "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" +) + +var ( + healthyServers = []*url.URL{ + {Host: "127.0.0.1:18080"}, + } + + unHealthyServers = []*url.URL{ + {Host: "127.0.0.1:18081"}, + } + + healthyFakeChecker = healthchecker.NewFakeChecker(true, map[string]int{ + "http://127.0.0.1:8080": 1, + }) + + unHealthyFakeChecker = healthchecker.NewFakeChecker(false, map[string]int{ + "http://127.0.0.1:8081": 1, + }) ) func newPod(podName string) *corev1.Pod { @@ -44,24 +65,24 @@ func newPod(podName string) *corev1.Pod { return pod } -func newPodWithCondition(podName string, ready bool) *corev1.Pod { +func newPodWithCondition(podName string, ready corev1.ConditionStatus) *corev1.Pod { pod := newPod(podName) SetPodUpgradeCondition(pod, ready) return pod } -func SetPodUpgradeCondition(pod *corev1.Pod, ok bool) { +func SetPodUpgradeCondition(pod *corev1.Pod, ready corev1.ConditionStatus) { cond := corev1.PodCondition{ - Type: PodNeedUpgrade, - Status: corev1.ConditionStatus(strconv.FormatBool(ok)), + Type: daemonpodupdater.PodNeedUpgrade, + Status: ready, } pod.Status.Conditions = append(pod.Status.Conditions, cond) } func TestGetPods(t *testing.T) { - updatablePod := newPodWithCondition("updatablePod", true) - notUpdatablePod := newPodWithCondition("notUpdatablePod", false) + updatablePod := newPodWithCondition("updatablePod", corev1.ConditionTrue) + notUpdatablePod := newPodWithCondition("notUpdatablePod", corev1.ConditionFalse) normalPod := newPod("normalPod") clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod) @@ -72,10 +93,15 @@ func TestGetPods(t *testing.T) { } rr := httptest.NewRecorder() - GetPods(clientset, "").ServeHTTP(rr, req) + GetPods(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req) expectedCode := http.StatusOK assert.Equal(t, expectedCode, rr.Code) + + // Cloud-Edge network disconnected + rr = httptest.NewRecorder() + GetPods(clientset, "", unHealthyFakeChecker, unHealthyServers).ServeHTTP(rr, req) + assert.Equal(t, http.StatusForbidden, rr.Code) } func TestUpdatePod(t *testing.T) { @@ -89,21 +115,21 @@ func TestUpdatePod(t *testing.T) { { reqURL: "/openyurt.io/v1/namespaces/default/pods/updatablePod/update", podName: "updatablePod", - pod: newPodWithCondition("updatablePod", true), + pod: newPodWithCondition("updatablePod", corev1.ConditionTrue), expectedCode: http.StatusOK, expectedData: "", }, { reqURL: "/openyurt.io/v1/namespaces/default/pods/notUpdatablePod/update", podName: "notUpdatablePod", - pod: newPodWithCondition("notUpdatablePod", false), + pod: newPodWithCondition("notUpdatablePod", corev1.ConditionFalse), expectedCode: http.StatusForbidden, expectedData: "Pod is not-updatable", }, { reqURL: "/openyurt.io/v1/namespaces/default/pods/wrongName/update", podName: "wrongName", - pod: newPodWithCondition("trueName", true), + pod: newPodWithCondition("trueName", corev1.ConditionFalse), expectedCode: http.StatusInternalServerError, expectedData: "Apply update failed", }, @@ -122,7 +148,7 @@ func TestUpdatePod(t *testing.T) { req = mux.SetURLVars(req, vars) rr := httptest.NewRecorder() - UpdatePod(clientset, "").ServeHTTP(rr, req) + UpdatePod(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req) assert.Equal(t, test.expectedCode, rr.Code) assert.Equal(t, test.expectedData, rr.Body.String()) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 8f87fa37c15..104a1206226 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -37,6 +37,7 @@ import ( "github.com/openyurtio/openyurt/pkg/util/certmanager" certfactory "github.com/openyurtio/openyurt/pkg/util/certmanager/factory" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" + "github.com/openyurtio/openyurt/pkg/yurthub/healthchecker" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate" ) @@ -61,7 +62,8 @@ type yurtHubServer struct { func NewYurtHubServer(cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, proxyHandler http.Handler, - rest *rest.RestConfigManager) (Server, error) { + rest *rest.RestConfigManager, + checker healthchecker.HealthChecker) (Server, error) { hubMux := mux.NewRouter() restCfg := rest.GetRestConfig(false) clientSet, err := kubernetes.NewForConfig(restCfg) @@ -69,7 +71,7 @@ func NewYurtHubServer(cfg *config.YurtHubConfiguration, klog.Errorf("cannot create the client set: %v", err) return nil, err } - registerHandlers(hubMux, cfg, certificateMgr, clientSet, cfg.NodeName) + registerHandlers(hubMux, cfg, certificateMgr, clientSet, checker) hubServer := &http.Server{ Addr: cfg.YurtHubServerAddr, Handler: hubMux, @@ -157,7 +159,8 @@ func (s *yurtHubServer) Run() { } // registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token. -func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, clientset *kubernetes.Clientset, nodeName string) { +func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, + clientset *kubernetes.Clientset, checker healthchecker.HealthChecker) { // register handlers for update join token c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT") @@ -173,8 +176,9 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica c.Handle("/metrics", promhttp.Handler()) // register handler for ota upgrade - c.Handle("/pods", ota.GetPods(clientset, nodeName)).Methods("GET") - c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpdatePod(clientset, nodeName)).Methods("POST") + c.Handle("/pods", ota.GetPods(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("GET") + c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", + ota.UpdatePod(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("POST") } // healthz returns ok for healthz request