Skip to content

Commit

Permalink
use local cache to get pods
Browse files Browse the repository at this point in the history
Signed-off-by: hxcGit <houxc_mail@163.com>
  • Loading branch information
xavier-hou committed Sep 29, 2022
1 parent d457d13 commit a269988
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
25 changes: 17 additions & 8 deletions pkg/yurthub/otaupdate/ota.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,35 @@ import (
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
)

type OTAHandler func(kubernetes.Interface, string) http.Handler

// GetPods return pod list
func GetPods(clientset kubernetes.Interface, nodeName string) http.Handler {
func GetPods(store cachemanager.StorageWrapper) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
podList, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
})
objs, err := store.List("kubelet/pods")
if err != nil {
klog.Errorf("Get pod list failed, %v", err)
WriteErr(w, "Get pod list failed", http.StatusInternalServerError)
return
}
klog.V(5).Infof("Got pod list: %v", podList)

podList := new(corev1.PodList)
for _, obj := range objs {
pod, ok := obj.(*corev1.Pod)
if !ok {
klog.Errorf("Get pod list failed, %v", err)
WriteErr(w, "Get pod list failed", http.StatusInternalServerError)
return
}
podList.Items = append(podList.Items, *pod)
}

// Successfully get pod list, response 200
data, err := encodePodList(podList)
data, err := encodePods(podList)
if err != nil {
klog.Errorf("Encode pod list failed, %v", err)
WriteErr(w, "Encode pod list failed", http.StatusInternalServerError)
Expand Down Expand Up @@ -120,8 +129,8 @@ func applyUpdate(clientset kubernetes.Interface, namespace, podName, nodeName st
return nil, true
}

// encodePodList returns the encoded PodList
func encodePodList(podList *corev1.PodList) ([]byte, error) {
// Derived from kubelet encodePods
func encodePods(podList *corev1.PodList) (data []byte, err error) {
codec := scheme.Codecs.LegacyCodec(runtimescheme.GroupVersion{Group: corev1.GroupName, Version: "v1"})
return runtime.Encode(codec, podList)
}
Expand Down
25 changes: 22 additions & 3 deletions pkg/yurthub/otaupdate/ota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package otaupdate

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -28,11 +29,16 @@ import (
"k8s.io/client-go/kubernetes/fake"

"github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
)

func newPod(podName string) *corev1.Pod {
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: podName,
Namespace: metav1.NamespaceDefault,
Expand Down Expand Up @@ -61,19 +67,32 @@ func SetPodUpgradeCondition(pod *corev1.Pod, ready corev1.ConditionStatus) {
}

func TestGetPods(t *testing.T) {
dir := t.TempDir()
dStorage, err := disk.NewDiskStorage(dir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
sWrapper := cachemanager.NewStorageWrapper(dStorage)

updatablePod := newPodWithCondition("updatablePod", corev1.ConditionTrue)
notUpdatablePod := newPodWithCondition("notUpdatablePod", corev1.ConditionFalse)
normalPod := newPod("normalPod")

clientset := fake.NewSimpleClientset(updatablePod, notUpdatablePod, normalPod)
pods := []*corev1.Pod{updatablePod, notUpdatablePod, normalPod}
for _, pod := range pods {
err = sWrapper.Create(fmt.Sprintf("kubelet/pods/default/%s", pod.Name), pod)
if err != nil {
t.Errorf("failed to create obj, %v", err)
}
}

req, err := http.NewRequest("GET", "/openyurt.io/v1/pods", nil)
if err != nil {
t.Fatal(err)
}
rr := httptest.NewRecorder()

GetPods(clientset, "").ServeHTTP(rr, req)
GetPods(sWrapper).ServeHTTP(rr, req)

expectedCode := http.StatusOK
assert.Equal(t, expectedCode, rr.Code)
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica
c.Handle("/metrics", promhttp.Handler())

// register handler for ota upgrade
c.Handle("/pods", ota.HealthyCheck(rest, cfg.NodeName, ota.GetPods)).Methods("GET")
c.Handle("/pods", ota.GetPods(cfg.StorageWrapper)).Methods("GET")
c.Handle("/openyurt.io/v1/namespaces/{ns}/pods/{podname}/upgrade",
ota.HealthyCheck(rest, cfg.NodeName, ota.UpdatePod)).Methods("POST")
}
Expand Down

0 comments on commit a269988

Please sign in to comment.