Skip to content

Commit

Permalink
fix cloud-edge connection judgment
Browse files Browse the repository at this point in the history
Signed-off-by: hxcGit <houxc_mail@163.com>
  • Loading branch information
xavier-hou committed Sep 29, 2022
1 parent 24cc835 commit d457d13
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
cfg.YurtSharedFactory.Start(stopCh)

klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr)
s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr, healthChecker)
s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr)
if err != nil {
return fmt.Errorf("could not create hub server, %w", err)
}
Expand Down
63 changes: 29 additions & 34 deletions pkg/yurthub/otaupdate/ota.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"net/http"
"net/url"

"github.com/gorilla/mux"
corev1 "k8s.io/api/core/v1"
Expand All @@ -32,22 +31,16 @@ import (
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
)

type OTAHandler func(kubernetes.Interface, string) http.Handler

// GetPods return pod list
func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker,
servers []*url.URL) http.Handler {
func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Pre-check if edge yurthub node is connected to the cloud
if !isEdgeCloudConnected(checker, servers) {
klog.Errorf("Get pod list is not allowed when edge is disconnected to cloud")
WriteErr(w, "Get pod list is not allowed when edge is disconnected to cloud", http.StatusForbidden)
return
}

podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=" + nodeName,
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
})
if err != nil {
klog.Errorf("Get pod list failed, %v", err)
Expand All @@ -67,16 +60,8 @@ func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchec
}

// UpdatePod update a specifc pod(namespace/podname) to the latest version
func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker,
servers []*url.URL) http.Handler {
func UpdatePod(clientset kubernetes.Interface, nodeName string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Pre-check if edge yurthub node is connected to the cloud
if !isEdgeCloudConnected(checker, servers) {
klog.Errorf("Apply update is not allowed when edge is disconnected to cloud")
WriteErr(w, "Apply update is not allowed when edge is disconnected to cloud", http.StatusForbidden)
return
}

params := mux.Vars(r)
namespace := params["ns"]
podName := params["podname"]
Expand All @@ -94,7 +79,7 @@ func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthch
}

// Successfully apply update, response 200
WriteJSONResponse(w, []byte(fmt.Sprintf("Start updating pod %q/%q", namespace, podName)))
WriteJSONResponse(w, []byte(fmt.Sprintf("Start updating pod %v/%v", namespace, podName)))
})
}

Expand Down Expand Up @@ -131,7 +116,7 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st
return err, false
}

klog.Infof("Start updating pod: %q/%q", namespace, podName)
klog.Infof("Start updating pod: %v/%v", namespace, podName)
return nil, true
}

Expand All @@ -141,17 +126,6 @@ func encodePodList(podList *corev1.PodList) ([]byte, error) {
return runtime.Encode(codec, podList)
}

// isEdgeCloudConnected will check if edge is disconnected to cloud. If there is any remote server is healthy, it is
// regarded as connected. Otherwise, it is regarded as disconnected and return false.
func isEdgeCloudConnected(checker healthchecker.HealthChecker, remoteServers []*url.URL) bool {
for _, server := range remoteServers {
if checker.IsHealthy(server) {
return true
}
}
return false
}

// WriteErr writes the http status and the error string on the response
func WriteErr(w http.ResponseWriter, errReason string, httpStatus int) {
w.WriteHeader(httpStatus)
Expand All @@ -175,3 +149,24 @@ func WriteJSONResponse(w http.ResponseWriter, data []byte) {
klog.Errorf("Write resp for request, expect %d bytes but write %d bytes with error, %v", len(data), n, err)
}
}

// HealthyCheck checks if cloud-edge is disconnected before ota update handle, ota update is not allowed when disconnected
func HealthyCheck(rest *rest.RestConfigManager, nodeName string, handler OTAHandler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
restCfg := rest.GetRestConfig(true)
if restCfg == nil {
klog.Infof("Get pod list is not allowed when edge is disconnected to cloud")
WriteErr(w, "OTA update is not allowed when edge is disconnected to cloud", http.StatusForbidden)
return
}

clientSet, err := kubernetes.NewForConfig(restCfg)
if err != nil {
klog.Errorf("Get client set failed: %v", err)
WriteErr(w, "Get client set failed", http.StatusInternalServerError)
return
}

handler(clientSet, nodeName).ServeHTTP(w, r)
})
}
31 changes: 3 additions & 28 deletions pkg/yurthub/otaupdate/ota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package otaupdate
import (
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/gorilla/mux"
Expand All @@ -29,25 +28,6 @@ import (
"k8s.io/client-go/kubernetes/fake"

"github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
)

var (
healthyServers = []*url.URL{
{Host: "127.0.0.1:18080"},
}

unHealthyServers = []*url.URL{
{Host: "127.0.0.1:18081"},
}

healthyFakeChecker = healthchecker.NewFakeChecker(true, map[string]int{
"http://127.0.0.1:8080": 1,
})

unHealthyFakeChecker = healthchecker.NewFakeChecker(false, map[string]int{
"http://127.0.0.1:8081": 1,
})
)

func newPod(podName string) *corev1.Pod {
Expand Down Expand Up @@ -93,15 +73,10 @@ func TestGetPods(t *testing.T) {
}
rr := httptest.NewRecorder()

GetPods(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req)
GetPods(clientset, "").ServeHTTP(rr, req)

expectedCode := http.StatusOK
assert.Equal(t, expectedCode, rr.Code)

// Cloud-Edge network disconnected
rr = httptest.NewRecorder()
GetPods(clientset, "", unHealthyFakeChecker, unHealthyServers).ServeHTTP(rr, req)
assert.Equal(t, http.StatusForbidden, rr.Code)
}

func TestUpdatePod(t *testing.T) {
Expand All @@ -117,7 +92,7 @@ func TestUpdatePod(t *testing.T) {
podName: "updatablePod",
pod: newPodWithCondition("updatablePod", corev1.ConditionTrue),
expectedCode: http.StatusOK,
expectedData: "Start updating pod \"default\"/\"updatablePod\"",
expectedData: "Start updating pod default/updatablePod",
},
{
reqURL: "/openyurt.io/v1/namespaces/default/pods/notUpdatablePod/update",
Expand Down Expand Up @@ -148,7 +123,7 @@ func TestUpdatePod(t *testing.T) {
req = mux.SetURLVars(req, vars)
rr := httptest.NewRecorder()

UpdatePod(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req)
UpdatePod(clientset, "").ServeHTTP(rr, req)

assert.Equal(t, test.expectedCode, rr.Code)
assert.Equal(t, test.expectedData, rr.Body.String())
Expand Down
12 changes: 5 additions & 7 deletions pkg/yurthub/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/openyurtio/openyurt/pkg/util/certmanager"
certfactory "github.com/openyurtio/openyurt/pkg/util/certmanager/factory"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate"
)
Expand All @@ -62,16 +61,15 @@ type yurtHubServer struct {
func NewYurtHubServer(cfg *config.YurtHubConfiguration,
certificateMgr interfaces.YurtCertificateManager,
proxyHandler http.Handler,
rest *rest.RestConfigManager,
checker healthchecker.HealthChecker) (Server, error) {
rest *rest.RestConfigManager) (Server, error) {
hubMux := mux.NewRouter()
restCfg := rest.GetRestConfig(false)
clientSet, err := kubernetes.NewForConfig(restCfg)
if err != nil {
klog.Errorf("cannot create the client set: %v", err)
return nil, err
}
registerHandlers(hubMux, cfg, certificateMgr, clientSet, checker)
registerHandlers(hubMux, cfg, certificateMgr, rest)
hubServer := &http.Server{
Addr: cfg.YurtHubServerAddr,
Handler: hubMux,
Expand Down Expand Up @@ -160,7 +158,7 @@ func (s *yurtHubServer) Run() {

// registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token.
func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager,
clientset *kubernetes.Clientset, checker healthchecker.HealthChecker) {
rest *rest.RestConfigManager) {
// register handlers for update join token
c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT")

Expand All @@ -176,9 +174,9 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica
c.Handle("/metrics", promhttp.Handler())

// register handler for ota upgrade
c.Handle("/pods", ota.GetPods(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("GET")
c.Handle("/pods", ota.HealthyCheck(rest, cfg.NodeName, ota.GetPods)).Methods("GET")
c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade",
ota.UpdatePod(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("POST")
ota.HealthyCheck(rest, cfg.NodeName, ota.UpdatePod)).Methods("POST")
}

// healthz returns ok for healthz request
Expand Down

0 comments on commit d457d13

Please sign in to comment.