Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koorldet: remove logging field from the kubelet configz #1907

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion pkg/koordlet/statesinformer/impl/kubelet_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"k8s.io/kubernetes/cmd/kubelet/app/options"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand Down Expand Up @@ -99,6 +100,7 @@ type kubeletConfigz struct {
ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"`
}

// GetKubeletConfiguration removes the logging field from the configz during unmarshall to make sure the configz is compatible
func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletConfiguration, error) {
configzURL := url.URL{
Scheme: k.scheme,
Expand All @@ -119,10 +121,15 @@ func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletC
if err != nil {
return nil, err
}
klog.V(6).Infof("get kubelet configz: %s", string(body))

var configz kubeletConfigz
if err = json.Unmarshal(body, &configz); err != nil {
return nil, fmt.Errorf("failed to unmarshal kubeletConfigz: %v", err)
// TODO remove unmarshalFromNewVersionConfig after upgrade k8s dependency to 1.28
if newVersionErr := unmarshalFromNewVersionConfig(body, &configz); newVersionErr != nil {
return nil, fmt.Errorf("failed to unmarshal new version kubeletConfigz, error %v, origin error :%v",
newVersionErr, err)
}
}

kubeletConfiguration, err := options.NewKubeletConfiguration()
Expand All @@ -139,3 +146,28 @@ func (k *kubeletStub) GetKubeletConfiguration() (*kubeletconfiginternal.KubeletC
}
return kubeletConfiguration, nil
}

// In the Kubernetes 1.28, the Kubelet configuration introduces an API change that is not forward-compatible:
// v1.24: https://github.com/kubernetes/component-base/blob/release-1.24/config/types.go#L99
// v1.26: https://github.com/kubernetes/component-base/blob/release-1.26/logs/api/v1/types.go#L45
// v1.28: https://github.com/kubernetes/component-base/blob/release-1.28/logs/api/v1/types.go#L48
// unmarshalFromNewVersionConfig removes the logging field from the configz
func unmarshalFromNewVersionConfig(body []byte, configz *kubeletConfigz) error {
rawConfig := struct {
KubeletConfig map[string]interface{} `json:"kubeletconfig"`
}{}
if err := json.Unmarshal(body, &rawConfig); err != nil {
return fmt.Errorf("failed to unmarshal kubeletConfigz to raw map: %v", err)
}
delete(rawConfig.KubeletConfig, "logging")
scrubbedBody, err := json.Marshal(rawConfig)
if err != nil {
return fmt.Errorf("failed to marshal scrubbed kubeletConfigz %v, error: %v", rawConfig, err)
}

if err = json.Unmarshal(scrubbedBody, &configz); err != nil {
return fmt.Errorf("failed to unmarshal scrubbed kubeletConfigz: %v", err)
}

return nil
}
95 changes: 95 additions & 0 deletions pkg/koordlet/statesinformer/impl/kubelet_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,41 @@ var (
}
}`,
)
kubeletConfigzData128 = []byte(`
{
"kubeletconfig": {
"enableServer": true,
"cpuManagerPolicy": "static",
"cpuManagerReconcilePeriod": "10s",
"evictionHard": {
"imagefs.available": "15%",
"memory.available": "222Mi",
"nodefs.available": "10%",
"nodefs.inodesFree": "5%"
},
"systemReserved": {
"cpu": "200m",
"memory": "1111Mi",
"pid": "1000"
},
"kubeReserved": {
"cpu": "200m",
"memory": "6666Mi",
"pid": "1000"
},
"logging": {
"format":"text",
"flushFrequency":"5s",
"verbosity":3,
"options":{
"json":{
"infoBufferSize":"0"
}
}
}
}
}`,
)
)

func validateAuth(r *http.Request) bool {
Expand Down Expand Up @@ -109,6 +144,16 @@ func mockGetKubeletConfiguration(w http.ResponseWriter, r *http.Request) {
w.Write(kubeletConfigzData)
}

func mockGetKubeletConfiguration128(w http.ResponseWriter, r *http.Request) {
if !validateAuth(r) {
w.WriteHeader(http.StatusUnauthorized)
return
}
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
w.Write(kubeletConfigzData128)
}

func parseHostAndPort(rawURL string) (string, string, error) {
u, err := url.Parse(rawURL)
if err != nil {
Expand Down Expand Up @@ -200,6 +245,56 @@ func Test_kubeletStub_GetKubeletConfiguration(t *testing.T) {
assert.Equal(t, expectedKubeReserved, kubeletConfiguration.KubeReserved)
}

func Test_kubeletStub_GetKubeletConfigurationFrom128(t *testing.T) {
token = "token"

server := httptest.NewTLSServer(http.HandlerFunc(mockGetKubeletConfiguration128))
defer server.Close()

address, portStr, err := parseHostAndPort(server.URL)
if err != nil {
t.Fatal(err)
}

port, _ := strconv.Atoi(portStr)
cfg := &rest.Config{
Host: net.JoinHostPort(address, portStr),
BearerToken: token,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}

client, err := NewKubeletStub(address, port, "https", 10*time.Second, cfg)
if err != nil {
t.Fatal(err)
}
kubeletConfiguration, err := client.GetKubeletConfiguration()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "static", kubeletConfiguration.CPUManagerPolicy)
expectedEvictionHard := map[string]string{
"imagefs.available": "15%",
"memory.available": "222Mi",
"nodefs.available": "10%",
"nodefs.inodesFree": "5%",
}
expectedSystemReserved := map[string]string{
"cpu": "200m",
"memory": "1111Mi",
"pid": "1000",
}
expectedKubeReserved := map[string]string{
"cpu": "200m",
"memory": "6666Mi",
"pid": "1000",
}
assert.Equal(t, expectedEvictionHard, kubeletConfiguration.EvictionHard)
assert.Equal(t, expectedSystemReserved, kubeletConfiguration.SystemReserved)
assert.Equal(t, expectedKubeReserved, kubeletConfiguration.KubeReserved)
}

func TestNewKubeletStub(t *testing.T) {
type args struct {
addr string
Expand Down
Loading