diff --git a/pkg/koordlet/statesinformer/impl/kubelet_stub.go b/pkg/koordlet/statesinformer/impl/kubelet_stub.go index 9a03cb8c6..fca401c04 100644 --- a/pkg/koordlet/statesinformer/impl/kubelet_stub.go +++ b/pkg/koordlet/statesinformer/impl/kubelet_stub.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/rest" + "k8s.io/klog/v2" kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" "k8s.io/kubernetes/cmd/kubelet/app/options" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -99,6 +100,7 @@ type kubeletConfigz struct { ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"` } +// GetKubeletConfiguration removes the logging field from the configz during unmarshall to make sure the configz is compatible func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletConfiguration, error) { configzURL := url.URL{ Scheme: k.scheme, @@ -119,10 +121,15 @@ func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletC if err != nil { return nil, err } + klog.V(6).Infof("get kubelet configz: %s", string(body)) var configz kubeletConfigz if err = json.Unmarshal(body, &configz); err != nil { - return nil, fmt.Errorf("failed to unmarshal kubeletConfigz: %v", err) + // TODO remove unmarshalFromNewVersionConfig after upgrade k8s dependency to 1.28 + if newVersionErr := unmarshalFromNewVersionConfig(body, &configz); newVersionErr != nil { + return nil, fmt.Errorf("failed to unmarshal new version kubeletConfigz, error %v, origin error :%v", + newVersionErr, err) + } } kubeletConfiguration, err := options.NewKubeletConfiguration() @@ -139,3 +146,28 @@ func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletC } return kubeletConfiguration, nil } + +// In the Kubernetes 1.28, the Kubelet configuration introduces an API change that is not forward-compatible: +// v1.24: https://github.com/kubernetes/component-base/blob/release-1.24/config/types.go#L99 +// v1.26: https://github.com/kubernetes/component-base/blob/release-1.26/logs/api/v1/types.go#L45 +// v1.28: https://github.com/kubernetes/component-base/blob/release-1.28/logs/api/v1/types.go#L48 +// unmarshalFromNewVersionConfig removes the logging field from the configz +func unmarshalFromNewVersionConfig(body []byte, configz *kubeletConfigz) error { + rawConfig := struct { + KubeletConfig map[string]interface{} `json:"kubeletconfig"` + }{} + if err := json.Unmarshal(body, &rawConfig); err != nil { + return fmt.Errorf("failed to unmarshal kubeletConfigz to raw map: %v", err) + } + delete(rawConfig.KubeletConfig, "logging") + scrubbedBody, err := json.Marshal(rawConfig) + if err != nil { + return fmt.Errorf("failed to marshal scrubbed kubeletConfigz %v, error: %v", rawConfig, err) + } + + if err = json.Unmarshal(scrubbedBody, &configz); err != nil { + return fmt.Errorf("failed to unmarshal scrubbed kubeletConfigz: %v", err) + } + + return nil +} diff --git a/pkg/koordlet/statesinformer/impl/kubelet_stub_test.go b/pkg/koordlet/statesinformer/impl/kubelet_stub_test.go index 9c7570436..9d5b319fa 100644 --- a/pkg/koordlet/statesinformer/impl/kubelet_stub_test.go +++ b/pkg/koordlet/statesinformer/impl/kubelet_stub_test.go @@ -60,6 +60,41 @@ var ( } }`, ) + kubeletConfigzData128 = []byte(` + { + "kubeletconfig": { + "enableServer": true, + "cpuManagerPolicy": "static", + "cpuManagerReconcilePeriod": "10s", + "evictionHard": { + "imagefs.available": "15%", + "memory.available": "222Mi", + "nodefs.available": "10%", + "nodefs.inodesFree": "5%" + }, + "systemReserved": { + "cpu": "200m", + "memory": "1111Mi", + "pid": "1000" + }, + "kubeReserved": { + "cpu": "200m", + "memory": "6666Mi", + "pid": "1000" + }, + "logging": { + "format":"text", + "flushFrequency":"5s", + "verbosity":3, + "options":{ + "json":{ + "infoBufferSize":"0" + } + } + } + } + }`, + ) ) func validateAuth(r *http.Request) bool { @@ -109,6 +144,16 @@ func mockGetKubeletConfiguration(w http.ResponseWriter, r *http.Request) { w.Write(kubeletConfigzData) } +func mockGetKubeletConfiguration128(w http.ResponseWriter, r *http.Request) { + if !validateAuth(r) { + w.WriteHeader(http.StatusUnauthorized) + return + } + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write(kubeletConfigzData128) +} + func parseHostAndPort(rawURL string) (string, string, error) { u, err := url.Parse(rawURL) if err != nil { @@ -200,6 +245,56 @@ func Test_kubeletStub_GetKubeletConfiguration(t *testing.T) { assert.Equal(t, expectedKubeReserved, kubeletConfiguration.KubeReserved) } +func Test_kubeletStub_GetKubeletConfigurationFrom128(t *testing.T) { + token = "token" + + server := httptest.NewTLSServer(http.HandlerFunc(mockGetKubeletConfiguration128)) + defer server.Close() + + address, portStr, err := parseHostAndPort(server.URL) + if err != nil { + t.Fatal(err) + } + + port, _ := strconv.Atoi(portStr) + cfg := &rest.Config{ + Host: net.JoinHostPort(address, portStr), + BearerToken: token, + TLSClientConfig: rest.TLSClientConfig{ + Insecure: true, + }, + } + + client, err := NewKubeletStub(address, port, "https", 10*time.Second, cfg) + if err != nil { + t.Fatal(err) + } + kubeletConfiguration, err := client.GetKubeletConfiguration() + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "static", kubeletConfiguration.CPUManagerPolicy) + expectedEvictionHard := map[string]string{ + "imagefs.available": "15%", + "memory.available": "222Mi", + "nodefs.available": "10%", + "nodefs.inodesFree": "5%", + } + expectedSystemReserved := map[string]string{ + "cpu": "200m", + "memory": "1111Mi", + "pid": "1000", + } + expectedKubeReserved := map[string]string{ + "cpu": "200m", + "memory": "6666Mi", + "pid": "1000", + } + assert.Equal(t, expectedEvictionHard, kubeletConfiguration.EvictionHard) + assert.Equal(t, expectedSystemReserved, kubeletConfiguration.SystemReserved) + assert.Equal(t, expectedKubeReserved, kubeletConfiguration.KubeReserved) +} + func TestNewKubeletStub(t *testing.T) { type args struct { addr string