Skip to content

Commit

Permalink
forbidden ota upgrade when cloud-edge is disconnected
Browse files Browse the repository at this point in the history
Signed-off-by: hxcGit <houxc_mail@163.com>
  • Loading branch information
xavier-hou committed Sep 27, 2022
1 parent 5a4677c commit 70107d7
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 37 deletions.
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
cfg.YurtSharedFactory.Start(stopCh)

klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr)
s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr)
s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler, restConfigMgr, healthChecker)
if err != nil {
return fmt.Errorf("could not create hub server, %w", err)
}
Expand Down
50 changes: 31 additions & 19 deletions pkg/yurthub/otaupdate/ota.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,29 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"

"github.com/gorilla/mux"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

// TODO(hxc): should use pkg/controller/daemonpodupdater.PodNeedUpgrade
const (
PodNeedUpgrade corev1.PodConditionType = "PodNeedUpgrade"
"github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
)

// GetPods return pod list
func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler {
func GetPods(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker,
servers []*url.URL) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Pre-check if edge yurthub node is connected to the cloud
if !isEdgeCloudConnected(checker, servers) {
klog.Errorf("Get pod list is not allowed when edge is disconnected to cloud")
returnErr(fmt.Errorf("Get pod list is not allowed when edge is disconnected to cloud"),
w, http.StatusForbidden)
return
}

podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=" + nodeName,
})
Expand All @@ -52,7 +59,8 @@ func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler {
data, err := json.Marshal(podList)
if err != nil {
klog.Errorf("Marshal pod list failed: %v", err.Error())
returnErr(fmt.Errorf("Get pod list failed: data transfer to json format failed."), w, http.StatusInternalServerError)
returnErr(fmt.Errorf("Get pod list failed: data transfer to json format failed."),
w, http.StatusInternalServerError)
return
}

Expand All @@ -65,8 +73,17 @@ func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler {
}

// UpdatePod update a specifc pod(namespace/podname) to the latest version
func UpdatePod(clientset kubernetes.Interface, nodeName string) http.Handler {
func UpdatePod(clientset kubernetes.Interface, nodeName string, checker healthchecker.HealthChecker,
servers []*url.URL) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Pre-check if edge yurthub node is connected to the cloud
if !isEdgeCloudConnected(checker, servers) {
klog.Errorf("Apply update is not allowed when edge is disconnected to cloud")
returnErr(fmt.Errorf("Apply update is not allowed when edge is disconnected to cloud"),
w, http.StatusForbidden)
return
}

params := mux.Vars(r)
namespace := params["ns"]
podName := params["podname"]
Expand Down Expand Up @@ -105,7 +122,7 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st
}

// Pod will not be updated without pod condition PodNeedUpgrade=true
if !IsPodUpdatable(pod) {
if !daemonpodupdater.IsPodUpdatable(pod) {
klog.Infof("Pod: %v/%v is not updatable", namespace, podName)
return nil, false
}
Expand All @@ -131,18 +148,13 @@ func returnErr(err error, w http.ResponseWriter, errType int) {
}
}

// TODO(hxc): should use pkg/controller/daemonpodupdater.IsPodUpdatable()
// IsPodUpdatable returns true if a pod is updatable; false otherwise.
func IsPodUpdatable(pod *corev1.Pod) bool {
if &pod.Status == nil || len(pod.Status.Conditions) == 0 {
return false
}

for i := range pod.Status.Conditions {
if pod.Status.Conditions[i].Type == PodNeedUpgrade && pod.Status.Conditions[i].Status == "true" {
// isEdgeCloudConnected will check if edge is disconnected to cloud. If there is any remote server is healthy, it is
// regarded as connected. Otherwise, it is regarded as disconnected and return false.
func isEdgeCloudConnected(checker healthchecker.HealthChecker, remoteServers []*url.URL) bool {
for _, server := range remoteServers {
if checker.IsHealthy(server) {
return true
}
}

return false
}
50 changes: 38 additions & 12 deletions pkg/yurthub/otaupdate/ota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,35 @@ package otaupdate
import (
"net/http"
"net/http/httptest"
"strconv"
"net/url"
"testing"

"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

"github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
)

var (
healthyServers = []*url.URL{
{Host: "127.0.0.1:18080"},
}

unHealthyServers = []*url.URL{
{Host: "127.0.0.1:18081"},
}

healthyFakeChecker = healthchecker.NewFakeChecker(true, map[string]int{
"http://127.0.0.1:8080": 1,
})

unHealthyFakeChecker = healthchecker.NewFakeChecker(false, map[string]int{
"http://127.0.0.1:8081": 1,
})
)

func newPod(podName string) *corev1.Pod {
Expand All @@ -44,24 +65,24 @@ func newPod(podName string) *corev1.Pod {
return pod
}

func newPodWithCondition(podName string, ready bool) *corev1.Pod {
func newPodWithCondition(podName string, ready corev1.ConditionStatus) *corev1.Pod {
pod := newPod(podName)
SetPodUpgradeCondition(pod, ready)

return pod
}

func SetPodUpgradeCondition(pod *corev1.Pod, ok bool) {
func SetPodUpgradeCondition(pod *corev1.Pod, ready corev1.ConditionStatus) {
cond := corev1.PodCondition{
Type: PodNeedUpgrade,
Status: corev1.ConditionStatus(strconv.FormatBool(ok)),
Type: daemonpodupdater.PodNeedUpgrade,
Status: ready,
}
pod.Status.Conditions = append(pod.Status.Conditions, cond)
}

func TestGetPods(t *testing.T) {
updatablePod := newPodWithCondition("updatablePod", true)
notUpdatablePod := newPodWithCondition("notUpdatablePod", false)
updatablePod := newPodWithCondition("updatablePod", corev1.ConditionTrue)
notUpdatablePod := newPodWithCondition("notUpdatablePod", corev1.ConditionFalse)
normalPod := newPod("normalPod")

clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod)
Expand All @@ -72,10 +93,15 @@ func TestGetPods(t *testing.T) {
}
rr := httptest.NewRecorder()

GetPods(clientset, "").ServeHTTP(rr, req)
GetPods(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req)

expectedCode := http.StatusOK
assert.Equal(t, expectedCode, rr.Code)

// Cloud-Edge network disconnected
rr = httptest.NewRecorder()
GetPods(clientset, "", unHealthyFakeChecker, unHealthyServers).ServeHTTP(rr, req)
assert.Equal(t, http.StatusForbidden, rr.Code)
}

func TestUpdatePod(t *testing.T) {
Expand All @@ -89,21 +115,21 @@ func TestUpdatePod(t *testing.T) {
{
reqURL: "/openyurt.io/v1/namespaces/default/pods/updatablePod/update",
podName: "updatablePod",
pod: newPodWithCondition("updatablePod", true),
pod: newPodWithCondition("updatablePod", corev1.ConditionTrue),
expectedCode: http.StatusOK,
expectedData: "",
},
{
reqURL: "/openyurt.io/v1/namespaces/default/pods/notUpdatablePod/update",
podName: "notUpdatablePod",
pod: newPodWithCondition("notUpdatablePod", false),
pod: newPodWithCondition("notUpdatablePod", corev1.ConditionFalse),
expectedCode: http.StatusForbidden,
expectedData: "Pod is not-updatable",
},
{
reqURL: "/openyurt.io/v1/namespaces/default/pods/wrongName/update",
podName: "wrongName",
pod: newPodWithCondition("trueName", true),
pod: newPodWithCondition("trueName", corev1.ConditionFalse),
expectedCode: http.StatusInternalServerError,
expectedData: "Apply update failed",
},
Expand All @@ -122,7 +148,7 @@ func TestUpdatePod(t *testing.T) {
req = mux.SetURLVars(req, vars)
rr := httptest.NewRecorder()

UpdatePod(clientset, "").ServeHTTP(rr, req)
UpdatePod(clientset, "", healthyFakeChecker, healthyServers).ServeHTTP(rr, req)

assert.Equal(t, test.expectedCode, rr.Code)
assert.Equal(t, test.expectedData, rr.Body.String())
Expand Down
14 changes: 9 additions & 5 deletions pkg/yurthub/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/openyurtio/openyurt/pkg/util/certmanager"
certfactory "github.com/openyurtio/openyurt/pkg/util/certmanager/factory"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
ota "github.com/openyurtio/openyurt/pkg/yurthub/otaupdate"
)
Expand All @@ -61,15 +62,16 @@ type yurtHubServer struct {
func NewYurtHubServer(cfg *config.YurtHubConfiguration,
certificateMgr interfaces.YurtCertificateManager,
proxyHandler http.Handler,
rest *rest.RestConfigManager) (Server, error) {
rest *rest.RestConfigManager,
checker healthchecker.HealthChecker) (Server, error) {
hubMux := mux.NewRouter()
restCfg := rest.GetRestConfig(false)
clientSet, err := kubernetes.NewForConfig(restCfg)
if err != nil {
klog.Errorf("cannot create the client set: %v", err)
return nil, err
}
registerHandlers(hubMux, cfg, certificateMgr, clientSet, cfg.NodeName)
registerHandlers(hubMux, cfg, certificateMgr, clientSet, checker)
hubServer := &http.Server{
Addr: cfg.YurtHubServerAddr,
Handler: hubMux,
Expand Down Expand Up @@ -157,7 +159,8 @@ func (s *yurtHubServer) Run() {
}

// registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token.
func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager, clientset *kubernetes.Clientset, nodeName string) {
func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager,
clientset *kubernetes.Clientset, checker healthchecker.HealthChecker) {
// register handlers for update join token
c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT")

Expand All @@ -173,8 +176,9 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica
c.Handle("/metrics", promhttp.Handler())

// register handler for ota upgrade
c.Handle("/pods", ota.GetPods(clientset, nodeName)).Methods("GET")
c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade", ota.UpdatePod(clientset, nodeName)).Methods("POST")
c.Handle("/pods", ota.GetPods(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("GET")
c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade",
ota.UpdatePod(clientset, cfg.NodeName, checker, cfg.RemoteServers)).Methods("POST")
}

// healthz returns ok for healthz request
Expand Down

0 comments on commit 70107d7

Please sign in to comment.