Skip to content

Commit

Permalink
koorldet: remove logging field from the kubelet configz (#1907)
Browse files Browse the repository at this point in the history
Signed-off-by: 佑祎 <zzw261520@alibaba-inc.com>
  • Loading branch information
zwzhang0107 authored Feb 26, 2024
1 parent 6488597 commit ee2aa04
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 1 deletion.
34 changes: 33 additions & 1 deletion pkg/koordlet/statesinformer/impl/kubelet_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"k8s.io/kubernetes/cmd/kubelet/app/options"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand Down Expand Up @@ -99,6 +100,7 @@ type kubeletConfigz struct {
ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"`
}

// GetKubeletConfiguration removes the logging field from the configz during unmarshall to make sure the configz is compatible
func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletConfiguration, error) {
configzURL := url.URL{
Scheme: k.scheme,
Expand All @@ -119,10 +121,15 @@ func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletC
if err != nil {
return nil, err
}
klog.V(6).Infof("get kubelet configz: %s", string(body))

var configz kubeletConfigz
if err = json.Unmarshal(body, &configz); err != nil {
return nil, fmt.Errorf("failed to unmarshal kubeletConfigz: %v", err)
// TODO remove unmarshalFromNewVersionConfig after upgrade k8s dependency to 1.28
if newVersionErr := unmarshalFromNewVersionConfig(body, &configz); newVersionErr != nil {
return nil, fmt.Errorf("failed to unmarshal new version kubeletConfigz, error %v, origin error :%v",
newVersionErr, err)
}
}

kubeletConfiguration, err := options.NewKubeletConfiguration()
Expand All @@ -139,3 +146,28 @@ func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletC
}
return kubeletConfiguration, nil
}

// In the Kubernetes 1.28, the Kubelet configuration introduces an API change that is not forward-compatible:
// v1.24: https://github.com/kubernetes/component-base/blob/release-1.24/config/types.go#L99
// v1.26: https://github.com/kubernetes/component-base/blob/release-1.26/logs/api/v1/types.go#L45
// v1.28: https://github.com/kubernetes/component-base/blob/release-1.28/logs/api/v1/types.go#L48
// unmarshalFromNewVersionConfig removes the logging field from the configz
func unmarshalFromNewVersionConfig(body []byte, configz *kubeletConfigz) error {
rawConfig := struct {
KubeletConfig map[string]interface{} `json:"kubeletconfig"`
}{}
if err := json.Unmarshal(body, &rawConfig); err != nil {
return fmt.Errorf("failed to unmarshal kubeletConfigz to raw map: %v", err)
}
delete(rawConfig.KubeletConfig, "logging")
scrubbedBody, err := json.Marshal(rawConfig)
if err != nil {
return fmt.Errorf("failed to marshal scrubbed kubeletConfigz %v, error: %v", rawConfig, err)
}

if err = json.Unmarshal(scrubbedBody, &configz); err != nil {
return fmt.Errorf("failed to unmarshal scrubbed kubeletConfigz: %v", err)
}

return nil
}
95 changes: 95 additions & 0 deletions pkg/koordlet/statesinformer/impl/kubelet_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,41 @@ var (
}
}`,
)
kubeletConfigzData128 = []byte(`
{
"kubeletconfig": {
"enableServer": true,
"cpuManagerPolicy": "static",
"cpuManagerReconcilePeriod": "10s",
"evictionHard": {
"imagefs.available": "15%",
"memory.available": "222Mi",
"nodefs.available": "10%",
"nodefs.inodesFree": "5%"
},
"systemReserved": {
"cpu": "200m",
"memory": "1111Mi",
"pid": "1000"
},
"kubeReserved": {
"cpu": "200m",
"memory": "6666Mi",
"pid": "1000"
},
"logging": {
"format":"text",
"flushFrequency":"5s",
"verbosity":3,
"options":{
"json":{
"infoBufferSize":"0"
}
}
}
}
}`,
)
)

func validateAuth(r *http.Request) bool {
Expand Down Expand Up @@ -109,6 +144,16 @@ func mockGetKubeletConfiguration(w http.ResponseWriter, r *http.Request) {
w.Write(kubeletConfigzData)
}

func mockGetKubeletConfiguration128(w http.ResponseWriter, r *http.Request) {
if !validateAuth(r) {
w.WriteHeader(http.StatusUnauthorized)
return
}
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
w.Write(kubeletConfigzData128)
}

func parseHostAndPort(rawURL string) (string, string, error) {
u, err := url.Parse(rawURL)
if err != nil {
Expand Down Expand Up @@ -200,6 +245,56 @@ func Test_kubeletStub_GetKubeletConfiguration(t *testing.T) {
assert.Equal(t, expectedKubeReserved, kubeletConfiguration.KubeReserved)
}

func Test_kubeletStub_GetKubeletConfigurationFrom128(t *testing.T) {
token = "token"

server := httptest.NewTLSServer(http.HandlerFunc(mockGetKubeletConfiguration128))
defer server.Close()

address, portStr, err := parseHostAndPort(server.URL)
if err != nil {
t.Fatal(err)
}

port, _ := strconv.Atoi(portStr)
cfg := &rest.Config{
Host: net.JoinHostPort(address, portStr),
BearerToken: token,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}

client, err := NewKubeletStub(address, port, "https", 10*time.Second, cfg)
if err != nil {
t.Fatal(err)
}
kubeletConfiguration, err := client.GetKubeletConfiguration()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "static", kubeletConfiguration.CPUManagerPolicy)
expectedEvictionHard := map[string]string{
"imagefs.available": "15%",
"memory.available": "222Mi",
"nodefs.available": "10%",
"nodefs.inodesFree": "5%",
}
expectedSystemReserved := map[string]string{
"cpu": "200m",
"memory": "1111Mi",
"pid": "1000",
}
expectedKubeReserved := map[string]string{
"cpu": "200m",
"memory": "6666Mi",
"pid": "1000",
}
assert.Equal(t, expectedEvictionHard, kubeletConfiguration.EvictionHard)
assert.Equal(t, expectedSystemReserved, kubeletConfiguration.SystemReserved)
assert.Equal(t, expectedKubeReserved, kubeletConfiguration.KubeReserved)
}

func TestNewKubeletStub(t *testing.T) {
type args struct {
addr string
Expand Down

0 comments on commit ee2aa04

Please sign in to comment.