From 7a4137d8999103078ad4dcc052db7289672ffa0d Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Mon, 6 Jun 2022 18:07:11 +0800 Subject: [PATCH 1/7] add testcase to reproduce the bug Signed-off-by: Daxin Wang --- .../metadata/kubernetes/pod_watch_test.go | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/collector/metadata/kubernetes/pod_watch_test.go b/collector/metadata/kubernetes/pod_watch_test.go index daf0f8653..f97a64fad 100644 --- a/collector/metadata/kubernetes/pod_watch_test.go +++ b/collector/metadata/kubernetes/pod_watch_test.go @@ -1,7 +1,9 @@ package kubernetes import ( + "encoding/json" "testing" + "time" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" @@ -192,3 +194,38 @@ func CreatePod(hasPort bool) *corev1.Pod { } return pod } + +func TestUpdateDelayDelete(t *testing.T) { + addObjJson := "{\"metadata\": {\"name\": \"testdemo2-5c86748464-26crb\",\"namespace\": \"test-ns\",\"resourceVersion\": \"44895976\"},\"spec\": {\"containers\": [{\"name\": \"testdemo2\",\"ports\": [{\"containerPort\": 9001,\"protocol\": \"TCP\"}]}]},\"status\": {\"phase\": \"Running\",\"podIP\": \"192.168.136.210\",\"containerStatuses\": [{\"name\": \"testdemo2\",\"state\": {\"running\": {\"startedAt\": \"2022-05-25T08:55:36Z\"}},\"lastState\": {},\"ready\": true,\"restartCount\": 5,\"image\": \"\",\"imageID\": \"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\": \"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\": true}]}}" + updateObjJson := "{\"metadata\": {\"name\": \"testdemo2-5c86748464-26crb\",\"namespace\": \"test-ns\",\"resourceVersion\": \"47374698\"},\"spec\": {\"containers\": [{\"name\": \"testdemo2\",\"ports\": [{\"containerPort\": 9001,\"protocol\": \"TCP\"}]}]},\"status\": {\"phase\": \"Running\",\"podIP\": \"192.168.136.210\",\"containerStatuses\": [{\"name\": \"testdemo2\",\"state\": {\"terminated\": {\"exitCode\": 143,\"reason\": \"Error\",\"startedAt\": \"2022-05-25T08:55:36Z\",\"finishedAt\": \"2022-06-06T09:04:12Z\",\"containerID\": \"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\"}},\"lastState\": {},\"ready\": false,\"restartCount\": 5,\"image\": \"\",\"imageID\": \"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\": \"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\": false}]}}" + addObj := new(corev1.Pod) + err := json.Unmarshal([]byte(addObjJson), addObj) + if err != nil { + t.Errorf("error unmarshalling %v", err) + } + updateObj := new(corev1.Pod) + err = json.Unmarshal([]byte(updateObjJson), updateObj) + if err != nil { + t.Fatalf("error unmarshalling %v", err) + } + podIp := addObj.Status.PodIP + port := addObj.Spec.Containers[0].Ports[0].ContainerPort + onAdd(addObj) + _, ok := MetaDataCache.GetContainerByIpPort(podIp, uint32(port)) + if !ok { + t.Fatalf("Not found container [%s:%d]", podIp, port) + } else { + t.Logf("Found container [%s:%d]", podIp, port) + } + stopCh := make(chan struct{}) + go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) + OnUpdate(addObj, updateObj) + time.Sleep(600 * time.Millisecond) + _, ok = MetaDataCache.GetContainerByIpPort(podIp, uint32(port)) + if !ok { + t.Errorf("Not found container [%s:%d]", podIp, port) + } else { + t.Logf("Found container [%s:%d]", podIp, port) + } + stopCh <- struct{}{} +} From ad16fbf3449aac18109a1c8dcd2051e8f8e01180 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 9 Jun 2022 11:08:36 +0800 Subject: [PATCH 2/7] replace PodInfo with k8sPodInfo at the globalPodInfo Signed-off-by: Daxin Wang --- collector/metadata/kubernetes/k8scache.go | 7 +- collector/metadata/kubernetes/pod_watch.go | 71 ++++++++----------- .../metadata/kubernetes/pod_watch_test.go | 6 +- .../metadata/kubernetes/service_watch_test.go | 9 +-- 4 files changed, 43 insertions(+), 50 deletions(-) diff --git a/collector/metadata/kubernetes/k8scache.go b/collector/metadata/kubernetes/k8scache.go index 545e7b431..80c0e9e29 100644 --- a/collector/metadata/kubernetes/k8scache.go +++ b/collector/metadata/kubernetes/k8scache.go @@ -14,8 +14,11 @@ type K8sContainerInfo struct { } type K8sPodInfo struct { - Ip string - PodName string + Ip string + PodName string + Ports []int32 + ContainerIds []string + Labels map[string]string // TODO: There may be multiple kinds of workload or services for the same pod WorkloadKind string WorkloadName string diff --git a/collector/metadata/kubernetes/pod_watch.go b/collector/metadata/kubernetes/pod_watch.go index be68f774f..23432871d 100644 --- a/collector/metadata/kubernetes/pod_watch.go +++ b/collector/metadata/kubernetes/pod_watch.go @@ -17,19 +17,10 @@ import ( _ "k8s.io/client-go/util/homedir" ) -type PodInfo struct { - Ip string - Ports []int32 - Name string - Labels map[string]string - Namespace string - ContainerIds []string -} - type podMap struct { // namespace: // podName: podInfo{} - Info map[string]map[string]*PodInfo + Info map[string]map[string]*K8sPodInfo mutex sync.RWMutex } @@ -38,18 +29,18 @@ var podUpdateMutex sync.Mutex func newPodMap() *podMap { return &podMap{ - Info: make(map[string]map[string]*PodInfo), + Info: make(map[string]map[string]*K8sPodInfo), mutex: sync.RWMutex{}, } } -func (m *podMap) add(info *PodInfo) { +func (m *podMap) add(info *K8sPodInfo) { m.mutex.Lock() podInfoMap, ok := m.Info[info.Namespace] if !ok { - podInfoMap = make(map[string]*PodInfo) + podInfoMap = make(map[string]*K8sPodInfo) } - podInfoMap[info.Name] = info + podInfoMap[info.PodName] = info m.Info[info.Namespace] = podInfoMap m.mutex.Unlock() } @@ -63,7 +54,7 @@ func (m *podMap) delete(namespace string, name string) { m.mutex.Unlock() } -func (m *podMap) get(namespace string, name string) (*PodInfo, bool) { +func (m *podMap) get(namespace string, name string) (*K8sPodInfo, bool) { m.mutex.RLock() defer m.mutex.RUnlock() podInfoMap, ok := m.Info[namespace] @@ -77,10 +68,10 @@ func (m *podMap) get(namespace string, name string) (*PodInfo, bool) { return podInfo, true } -// getPodsMatchSelectors gets PodInfo(s) whose labels match with selectors in such namespace. +// getPodsMatchSelectors gets K8sPodInfo(s) whose labels match with selectors in such namespace. // Return empty slice if not found. Note there may be multiple match. -func (m *podMap) getPodsMatchSelectors(namespace string, selectors map[string]string) []*PodInfo { - retPodInfoSlice := make([]*PodInfo, 0) +func (m *podMap) getPodsMatchSelectors(namespace string, selectors map[string]string) []*K8sPodInfo { + retPodInfoSlice := make([]*K8sPodInfo, 0) if len(selectors) == 0 { return retPodInfoSlice } @@ -126,18 +117,12 @@ func PodWatch(clientSet *kubernetes.Clientset, graceDeletePeriod time.Duration) func onAdd(obj interface{}) { pod := obj.(*corev1.Pod) - pI := PodInfo{ - Ip: pod.Status.PodIP, - Ports: make([]int32, 0, 1), - Name: pod.Name, - Labels: pod.Labels, - Namespace: pod.Namespace, - ContainerIds: make([]string, 0, 2), - } - - workloadTypeTmp := "" - workloadNameTmp := "" + // Find the controller workload of the pod + var ( + workloadTypeTmp string + workloadNameTmp string + ) rsUpdateMutex.RLock() for _, owner := range pod.OwnerReferences { // only care about the controller @@ -162,7 +147,8 @@ func onAdd(obj interface{}) { } rsUpdateMutex.RUnlock() - serviceInfoSlice := globalServiceInfo.GetServiceMatchLabels(pI.Namespace, pI.Labels) + // Find one of the services of the pod + serviceInfoSlice := globalServiceInfo.GetServiceMatchLabels(pod.Namespace, pod.Labels) var serviceInfo *K8sServiceInfo if len(serviceInfoSlice) == 0 { serviceInfo = nil @@ -177,10 +163,14 @@ func onAdd(obj interface{}) { // Only one of the matched services is cared, here we get the first one serviceInfo = serviceInfoSlice[0] } - var kpi = &K8sPodInfo{ + + var cachePodInfo = &K8sPodInfo{ Ip: pod.Status.PodIP, Namespace: pod.Namespace, PodName: pod.Name, + Ports: make([]int32, 0, 1), + ContainerIds: make([]string, 0, 2), + Labels: pod.Labels, WorkloadKind: workloadTypeTmp, WorkloadName: workloadNameTmp, NodeName: pod.Spec.NodeName, @@ -191,18 +181,17 @@ func onAdd(obj interface{}) { // Add containerId map for _, containerStatus := range pod.Status.ContainerStatuses { - containerId := containerStatus.ContainerID - realContainerId := TruncateContainerId(containerId) - if realContainerId == "" { + shortenContainerId := TruncateContainerId(containerStatus.ContainerID) + if shortenContainerId == "" { continue } - pI.ContainerIds = append(pI.ContainerIds, realContainerId) + cachePodInfo.ContainerIds = append(cachePodInfo.ContainerIds, shortenContainerId) containerInfo := &K8sContainerInfo{ - ContainerId: realContainerId, + ContainerId: shortenContainerId, Name: containerStatus.Name, - RefPodInfo: kpi, + RefPodInfo: cachePodInfo, } - MetaDataCache.AddByContainerId(realContainerId, containerInfo) + MetaDataCache.AddByContainerId(shortenContainerId, containerInfo) } // Add pod IP and port map @@ -211,7 +200,7 @@ func onAdd(obj interface{}) { containerInfo := &K8sContainerInfo{ Name: tmpContainer.Name, HostPortMap: make(map[int32]int32), - RefPodInfo: kpi, + RefPodInfo: cachePodInfo, } // Not specifying a port DOES NOT prevent that port from being exposed. // So Ports could be empty, if so we only record its IP address. @@ -227,7 +216,7 @@ func onAdd(obj interface{}) { continue } for _, port := range tmpContainer.Ports { - pI.Ports = append(pI.Ports, port.ContainerPort) + cachePodInfo.Ports = append(cachePodInfo.Ports, port.ContainerPort) // If hostPort is specified, add the container using HostIP and HostPort if port.HostPort != 0 { containerInfo.HostPortMap[port.HostPort] = port.ContainerPort @@ -237,7 +226,7 @@ func onAdd(obj interface{}) { } } } - globalPodInfo.add(&pI) + globalPodInfo.add(cachePodInfo) } func OnUpdate(objOld interface{}, objNew interface{}) { diff --git a/collector/metadata/kubernetes/pod_watch_test.go b/collector/metadata/kubernetes/pod_watch_test.go index f97a64fad..123b8084b 100644 --- a/collector/metadata/kubernetes/pod_watch_test.go +++ b/collector/metadata/kubernetes/pod_watch_test.go @@ -32,7 +32,7 @@ func TestTruncateContainerId(t *testing.T) { func TestOnAdd(t *testing.T) { globalPodInfo = &podMap{ - Info: make(map[string]map[string]*PodInfo), + Info: make(map[string]map[string]*K8sPodInfo), } globalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), @@ -56,7 +56,7 @@ func TestOnAdd(t *testing.T) { // ISSUE https://github.com/CloudDectective-Harmonycloud/kindling/issues/229 func TestOnAddPodWhileReplicaSetUpdating(t *testing.T) { globalPodInfo = &podMap{ - Info: make(map[string]map[string]*PodInfo), + Info: make(map[string]map[string]*K8sPodInfo), } globalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), @@ -98,7 +98,7 @@ func TestOnAddPodWhileReplicaSetUpdating(t *testing.T) { func TestOnAddLowercaseWorkload(t *testing.T) { globalPodInfo = &podMap{ - Info: make(map[string]map[string]*PodInfo), + Info: make(map[string]map[string]*K8sPodInfo), } globalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), diff --git a/collector/metadata/kubernetes/service_watch_test.go b/collector/metadata/kubernetes/service_watch_test.go index b5202c332..cc588839a 100644 --- a/collector/metadata/kubernetes/service_watch_test.go +++ b/collector/metadata/kubernetes/service_watch_test.go @@ -1,10 +1,11 @@ package kubernetes import ( - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "reflect" "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestSelectorsMatchLabels(t *testing.T) { @@ -116,7 +117,7 @@ func TestServiceMap_GetServiceMatchLabels(t *testing.T) { func TestOnAddService(t *testing.T) { globalPodInfo = &podMap{ - Info: make(map[string]map[string]*PodInfo), + Info: make(map[string]map[string]*K8sPodInfo), } globalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), @@ -139,7 +140,7 @@ func TestOnAddService(t *testing.T) { func TestServiceMap_Delete(t *testing.T) { globalPodInfo = &podMap{ - Info: make(map[string]map[string]*PodInfo), + Info: make(map[string]map[string]*K8sPodInfo), } globalServiceInfo = &ServiceMap{ ServiceMap: make(map[string]map[string]*K8sServiceInfo), From 18547daf3e01ab437e9ec99def97a3d28143a926 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 9 Jun 2022 17:08:41 +0800 Subject: [PATCH 3/7] record which part of pod should be deleted in the cache Signed-off-by: Daxin Wang --- collector/metadata/kubernetes/pod_delete.go | 54 ++++++++++--------- .../metadata/kubernetes/pod_delete_test.go | 14 ++--- collector/metadata/kubernetes/pod_watch.go | 39 ++++++++++++-- 3 files changed, 69 insertions(+), 38 deletions(-) diff --git a/collector/metadata/kubernetes/pod_delete.go b/collector/metadata/kubernetes/pod_delete.go index 0910aa184..95c20d58a 100644 --- a/collector/metadata/kubernetes/pod_delete.go +++ b/collector/metadata/kubernetes/pod_delete.go @@ -1,12 +1,12 @@ // Copyright 2020 OpenTelemetry Authors // Source: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/k8sattributesprocessor/internal/kube/client.go +// Modification: Use deletedPodInfo as deleted elements and delete our cache map as needed. + package kubernetes import ( "sync" "time" - - corev1 "k8s.io/api/core/v1" ) var ( @@ -15,8 +15,18 @@ var ( ) type deleteRequest struct { - pod *corev1.Pod - ts time.Time + podInfo *deletedPodInfo + ts time.Time +} + +type deletedPodInfo struct { + name string + namespace string + containerIds []string + ip string + ports []int32 + hostIp string + hostPorts []int32 } // deleteLoop deletes pods from cache periodically. @@ -40,7 +50,7 @@ func podDeleteLoop(interval time.Duration, gracePeriod time.Duration, stopCh cha podDeleteQueue = podDeleteQueue[cutoff:] podDeleteQueueMut.Unlock() for _, d := range toDelete { - deletePod(d.pod) + deletePodInfo(d.podInfo) } case <-stopCh: @@ -49,28 +59,24 @@ func podDeleteLoop(interval time.Duration, gracePeriod time.Duration, stopCh cha } } -func deletePod(pod *corev1.Pod) { - for i := 0; i < len(pod.Status.ContainerStatuses); i++ { - containerId := pod.Status.ContainerStatuses[i].ContainerID - realContainerId := TruncateContainerId(containerId) - if realContainerId == "" { - continue - } - MetaDataCache.DeleteByContainerId(realContainerId) +func deletePodInfo(podInfo *deletedPodInfo) { + if podInfo.name != "" { + globalPodInfo.delete(podInfo.namespace, podInfo.name) } - - for _, container := range pod.Spec.Containers { - if len(container.Ports) == 0 { - MetaDataCache.DeleteContainerByIpPort(pod.Status.PodIP, 0) - continue + if len(podInfo.containerIds) != 0 { + for i := 0; i < len(podInfo.containerIds); i++ { + MetaDataCache.DeleteByContainerId(podInfo.containerIds[i]) } - for _, port := range container.Ports { + } + if podInfo.ip != "" && len(podInfo.ports) != 0 { + for _, port := range podInfo.ports { // Assume that PodIP:Port can't be reused in a few seconds - MetaDataCache.DeleteContainerByIpPort(pod.Status.PodIP, uint32(port.ContainerPort)) - // If hostPort is specified, add the container using HostIP and HostPort - if port.HostPort != 0 { - MetaDataCache.DeleteContainerByHostIpPort(pod.Status.HostIP, uint32(port.HostPort)) - } + MetaDataCache.DeleteContainerByIpPort(podInfo.ip, uint32(port)) + } + } + if podInfo.hostIp != "" && len(podInfo.hostPorts) != 0 { + for _, port := range podInfo.hostPorts { + MetaDataCache.DeleteContainerByHostIpPort(podInfo.hostIp, uint32(port)) } } } diff --git a/collector/metadata/kubernetes/pod_delete_test.go b/collector/metadata/kubernetes/pod_delete_test.go index 397019d73..855725918 100644 --- a/collector/metadata/kubernetes/pod_delete_test.go +++ b/collector/metadata/kubernetes/pod_delete_test.go @@ -8,20 +8,12 @@ import ( func TestDeleteLoop(t *testing.T) { pod := CreatePod(true) onAdd(pod) - _, ok := globalPodInfo.get("CustomNamespace", "deploy-1a2b3c4d-5e6f7") - if !ok { - t.Fatalf("Finding pod at globalPodInfo. Expect %v, but get %v", true, ok) - } verifyIfPodExist(true, t) if len(podDeleteQueue) != 0 { t.Fatalf("PodDeleteQueue should be 0, but is %d", len(podDeleteQueue)) } onDelete(pod) - _, ok = globalPodInfo.get("CustomNamespace", "deploy-1a2b3c4d-5e6f7") - if ok { - t.Fatalf("Finding pod at globalPodInfo. Expect %v, but get %v", false, ok) - } verifyIfPodExist(true, t) if len(podDeleteQueue) != 1 { t.Fatalf("PodDeleteQueue should be 1, but is %d", len(podDeleteQueue)) @@ -52,7 +44,11 @@ func TestDeleteLoop(t *testing.T) { } func verifyIfPodExist(exist bool, t *testing.T) { - _, ok := MetaDataCache.GetByContainerId("1a2b3c4d5e6f") + _, ok := globalPodInfo.get("CustomNamespace", "deploy-1a2b3c4d-5e6f7") + if ok != exist { + t.Fatalf("Finding pod at globalPodInfo. Expect %v, but get %v", false, ok) + } + _, ok = MetaDataCache.GetByContainerId("1a2b3c4d5e6f") if ok != exist { t.Errorf("Finding container using containerid. Expect %v, but get %v", exist, ok) } diff --git a/collector/metadata/kubernetes/pod_watch.go b/collector/metadata/kubernetes/pod_watch.go index 23432871d..9eb158bf5 100644 --- a/collector/metadata/kubernetes/pod_watch.go +++ b/collector/metadata/kubernetes/pod_watch.go @@ -207,7 +207,7 @@ func onAdd(obj interface{}) { if len(tmpContainer.Ports) == 0 { // If there is more than one container that doesn't specify a port, // we would rather get an empty name than get an incorrect one. - if len(pod.Spec.Containers) > 0 { + if len(pod.Spec.Containers) > 1 { containerInfo.Name = "" } // When there are many containers in one pod and only part of them have ports, @@ -246,13 +246,42 @@ func OnUpdate(objOld interface{}, objNew interface{}) { func onDelete(obj interface{}) { pod := obj.(*corev1.Pod) - // Delete the intermediate data first - globalPodInfo.delete(pod.Namespace, pod.Name) + podInfo := &deletedPodInfo{ + name: pod.Name, + namespace: pod.Namespace, + containerIds: make([]string, 0), + ip: pod.Status.PodIP, + ports: make([]int32, 0), + hostIp: pod.Status.HostIP, + hostPorts: make([]int32, 0), + } + + for _, containerStatus := range pod.Status.ContainerStatuses { + shortenContainerId := TruncateContainerId(containerStatus.ContainerID) + if shortenContainerId == "" { + continue + } + podInfo.containerIds = append(podInfo.containerIds, shortenContainerId) + } + + for _, container := range pod.Spec.Containers { + if len(container.Ports) == 0 { + podInfo.ports = append(podInfo.ports, 0) + continue + } + for _, port := range container.Ports { + podInfo.ports = append(podInfo.ports, port.ContainerPort) + // If hostPort is specified, add the container using HostIP and HostPort + if port.HostPort != 0 { + podInfo.hostPorts = append(podInfo.hostPorts, port.HostPort) + } + } + } // Wait for a few seconds to remove the cache data podDeleteQueueMut.Lock() podDeleteQueue = append(podDeleteQueue, deleteRequest{ - pod: pod, - ts: time.Now(), + podInfo: podInfo, + ts: time.Now(), }) podDeleteQueueMut.Unlock() } From aca29551d444dff95695498c11ce83488b862fe6 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 9 Jun 2022 21:19:54 +0800 Subject: [PATCH 4/7] fix onUpdate bug Signed-off-by: Daxin Wang --- collector/metadata/kubernetes/k8scache.go | 1 + collector/metadata/kubernetes/pod_watch.go | 129 ++++++++++++++++----- collector/pkg/compare/int32_slice.go | 49 ++++++++ collector/pkg/compare/slice_test.go | 29 +++++ collector/pkg/compare/string_slice.go | 49 ++++++++ 5 files changed, 226 insertions(+), 31 deletions(-) create mode 100644 collector/pkg/compare/int32_slice.go create mode 100644 collector/pkg/compare/slice_test.go create mode 100644 collector/pkg/compare/string_slice.go diff --git a/collector/metadata/kubernetes/k8scache.go b/collector/metadata/kubernetes/k8scache.go index 80c0e9e29..c13843aa2 100644 --- a/collector/metadata/kubernetes/k8scache.go +++ b/collector/metadata/kubernetes/k8scache.go @@ -17,6 +17,7 @@ type K8sPodInfo struct { Ip string PodName string Ports []int32 + HostPorts []int32 ContainerIds []string Labels map[string]string // TODO: There may be multiple kinds of workload or services for the same pod diff --git a/collector/metadata/kubernetes/pod_watch.go b/collector/metadata/kubernetes/pod_watch.go index 9eb158bf5..80bf19252 100644 --- a/collector/metadata/kubernetes/pod_watch.go +++ b/collector/metadata/kubernetes/pod_watch.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/Kindling-project/kindling/collector/pkg/compare" corev1 "k8s.io/api/core/v1" _ "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" @@ -25,7 +26,6 @@ type podMap struct { } var globalPodInfo = newPodMap() -var podUpdateMutex sync.Mutex func newPodMap() *podMap { return &podMap{ @@ -119,32 +119,8 @@ func onAdd(obj interface{}) { pod := obj.(*corev1.Pod) // Find the controller workload of the pod - var ( - workloadTypeTmp string - workloadNameTmp string - ) rsUpdateMutex.RLock() - for _, owner := range pod.OwnerReferences { - // only care about the controller - if owner.Controller == nil || *owner.Controller != true { - continue - } - // TODO: recursion method to find the controller - if owner.Kind == ReplicaSetKind { - // The owner of Pod is ReplicaSet, and it is Workload such as Deployment for ReplicaSet. - // Therefore, find ReplicaSet's name in 'globalRsInfo' to find which kind of workload - // the Pod belongs to. - if workload, ok := globalRsInfo.GetOwnerReference(mapKey(pod.Namespace, owner.Name)); ok { - workloadTypeTmp = CompleteGVK(workload.APIVersion, strings.ToLower(workload.Kind)) - workloadNameTmp = workload.Name - break - } - } - // If the owner of pod is not ReplicaSet or the replicaset has no controller - workloadTypeTmp = CompleteGVK(owner.APIVersion, strings.ToLower(owner.Kind)) - workloadNameTmp = owner.Name - break - } + workloadTypeTmp, workloadNameTmp := getControllerKindName(pod) rsUpdateMutex.RUnlock() // Find one of the services of the pod @@ -168,7 +144,8 @@ func onAdd(obj interface{}) { Ip: pod.Status.PodIP, Namespace: pod.Namespace, PodName: pod.Name, - Ports: make([]int32, 0, 1), + Ports: make([]int32, 0), + HostPorts: make([]int32, 0), ContainerIds: make([]string, 0, 2), Labels: pod.Labels, WorkloadKind: workloadTypeTmp, @@ -213,6 +190,7 @@ func onAdd(obj interface{}) { // When there are many containers in one pod and only part of them have ports, // the containers at the back will overwrite the ones at the front here. MetaDataCache.AddContainerByIpPort(pod.Status.PodIP, 0, containerInfo) + cachePodInfo.Ports = append(cachePodInfo.Ports, 0) continue } for _, port := range tmpContainer.Ports { @@ -220,6 +198,7 @@ func onAdd(obj interface{}) { // If hostPort is specified, add the container using HostIP and HostPort if port.HostPort != 0 { containerInfo.HostPortMap[port.HostPort] = port.ContainerPort + cachePodInfo.HostPorts = append(cachePodInfo.HostPorts, port.HostPort) MetaDataCache.AddContainerByHostIpPort(pod.Status.HostIP, uint32(port.HostPort), containerInfo) } MetaDataCache.AddContainerByIpPort(pod.Status.PodIP, uint32(port.ContainerPort), containerInfo) @@ -229,6 +208,30 @@ func onAdd(obj interface{}) { globalPodInfo.add(cachePodInfo) } +func getControllerKindName(pod *corev1.Pod) (workloadKind string, workloadName string) { + for _, owner := range pod.OwnerReferences { + // only care about the controller + if owner.Controller == nil || *owner.Controller != true { + continue + } + if owner.Kind == ReplicaSetKind { + // The owner of Pod is ReplicaSet, and it is Workload such as Deployment for ReplicaSet. + // Therefore, find ReplicaSet's name in 'globalRsInfo' to find which kind of workload + // the Pod belongs to. + if workload, ok := globalRsInfo.GetOwnerReference(mapKey(pod.Namespace, owner.Name)); ok { + workloadKind = CompleteGVK(workload.APIVersion, strings.ToLower(workload.Kind)) + workloadName = workload.Name + return + } + } + // If the owner of pod is not ReplicaSet or the replicaset has no controller + workloadKind = CompleteGVK(owner.APIVersion, strings.ToLower(owner.Kind)) + workloadName = owner.Name + return + } + return +} + func OnUpdate(objOld interface{}, objNew interface{}) { oldPod := objOld.(*corev1.Pod) newPod := objNew.(*corev1.Pod) @@ -237,11 +240,75 @@ func OnUpdate(objOld interface{}, objNew interface{}) { // Two different versions of the same pod will always have different RVs. return } - podUpdateMutex.Lock() - // TODO: re-implement the updated logic to reduce computation - onDelete(objOld) + + oldCachePod, ok := globalPodInfo.get(oldPod.Namespace, oldPod.Name) + if !ok { + onAdd(objNew) + return + } + // Always override the old pod in the cache onAdd(objNew) - podUpdateMutex.Unlock() + + // Delay delete the pod using the difference between the old pod and the new one + deletedPodInfo := &deletedPodInfo{ + name: "", + namespace: oldPod.Namespace, + containerIds: nil, + ip: oldPod.Status.PodIP, + ports: nil, + hostIp: oldPod.Status.HostIP, + hostPorts: nil, + } + + if oldPod.Name != newPod.Name { + deletedPodInfo.name = oldPod.Name + } + + // Check the containers' ID + newContainerIds := make([]string, 0) + for _, containerStatus := range oldPod.Status.ContainerStatuses { + shortenContainerId := TruncateContainerId(containerStatus.ContainerID) + if shortenContainerId == "" { + continue + } + newContainerIds = append(newContainerIds, shortenContainerId) + } + containerIdCompare := compare.NewStringSlice(oldCachePod.ContainerIds, newContainerIds) + containerIdCompare.Compare() + deletedPodInfo.containerIds = containerIdCompare.GetRemovedElements() + + // Check the ports specified. + newPorts := make([]int32, 0) + newHostPorts := make([]int32, 0) + for _, container := range newPod.Spec.Containers { + if len(container.Ports) == 0 { + newPorts = append(newPorts, 0) + continue + } + for _, port := range container.Ports { + newPorts = append(newPorts, port.ContainerPort) + // If hostPort is specified, add the container using HostIP and HostPort + if port.HostPort != 0 { + newHostPorts = append(newHostPorts, port.HostPort) + } + } + } + + portsCompare := compare.NewInt32Slice(oldCachePod.Ports, newPorts) + portsCompare.Compare() + deletedPodInfo.ports = portsCompare.GetRemovedElements() + + hostPortsCompare := compare.NewInt32Slice(oldCachePod.HostPorts, newHostPorts) + hostPortsCompare.Compare() + deletedPodInfo.hostPorts = hostPortsCompare.GetRemovedElements() + + // Wait for a few seconds to remove the cache data + podDeleteQueueMut.Lock() + podDeleteQueue = append(podDeleteQueue, deleteRequest{ + podInfo: deletedPodInfo, + ts: time.Now(), + }) + podDeleteQueueMut.Unlock() } func onDelete(obj interface{}) { diff --git a/collector/pkg/compare/int32_slice.go b/collector/pkg/compare/int32_slice.go new file mode 100644 index 000000000..50eadae3d --- /dev/null +++ b/collector/pkg/compare/int32_slice.go @@ -0,0 +1,49 @@ +package compare + +type Int32Slice struct { + oldElements []int32 + oldFlags []bool + newElements []int32 + newFlags []bool +} + +func NewInt32Slice(oldElements []int32, newElements []int32) Int32Slice { + return Int32Slice{ + oldElements: oldElements, + oldFlags: make([]bool, len(oldElements)), + newElements: newElements, + newFlags: make([]bool, len(newElements)), + } +} + +func (c *Int32Slice) Compare() { + // The elements could be repeated, so we must iterate all the elements. + for i, newElement := range c.newElements { + for j, oldElement := range c.oldElements { + if oldElement == newElement { + c.newFlags[i] = true + c.oldFlags[j] = true + } + } + } +} + +func (c *Int32Slice) GetRemovedElements() []int32 { + ret := make([]int32, 0) + for i, flag := range c.oldFlags { + if !flag { + ret = append(ret, c.oldElements[i]) + } + } + return ret +} + +func (c *Int32Slice) GetAddedElements() []int32 { + ret := make([]int32, 0) + for i, flag := range c.newFlags { + if !flag { + ret = append(ret, c.newElements[i]) + } + } + return ret +} diff --git a/collector/pkg/compare/slice_test.go b/collector/pkg/compare/slice_test.go new file mode 100644 index 000000000..3849ae2f3 --- /dev/null +++ b/collector/pkg/compare/slice_test.go @@ -0,0 +1,29 @@ +package compare + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInt32Slice(t *testing.T) { + oldElements := []int32{0, 0, 1, 2, 3} + newElements := []int32{1, 3, 4, 6} + compareSlice := NewInt32Slice(oldElements, newElements) + compareSlice.Compare() + removedElements := compareSlice.GetRemovedElements() + assert.ElementsMatch(t, removedElements, []int32{0, 0, 2}) + addedElements := compareSlice.GetAddedElements() + assert.ElementsMatch(t, addedElements, []int32{4, 6}) +} + +func TestStringSlice(t *testing.T) { + oldElements := []string{"a", "b", "c"} + newElements := []string{"d", "e", "f"} + compareSlice := NewStringSlice(oldElements, newElements) + compareSlice.Compare() + removedElements := compareSlice.GetRemovedElements() + assert.ElementsMatch(t, removedElements, []string{"a", "b", "c"}) + addedElements := compareSlice.GetAddedElements() + assert.ElementsMatch(t, addedElements, []string{"d", "e", "f"}) +} diff --git a/collector/pkg/compare/string_slice.go b/collector/pkg/compare/string_slice.go new file mode 100644 index 000000000..52e40bd80 --- /dev/null +++ b/collector/pkg/compare/string_slice.go @@ -0,0 +1,49 @@ +package compare + +type StringSlice struct { + oldElements []string + oldFlags []bool + newElements []string + newFlags []bool +} + +func NewStringSlice(oldElements []string, newElements []string) StringSlice { + return StringSlice{ + oldElements: oldElements, + oldFlags: make([]bool, len(oldElements)), + newElements: newElements, + newFlags: make([]bool, len(newElements)), + } +} + +func (c *StringSlice) Compare() { + // The elements could be repeated, so we must iterate all the elements. + for i, newElement := range c.newElements { + for j, oldElement := range c.oldElements { + if oldElement == newElement { + c.newFlags[i] = true + c.oldFlags[j] = true + } + } + } +} + +func (c *StringSlice) GetRemovedElements() []string { + ret := make([]string, 0) + for i, flag := range c.oldFlags { + if !flag { + ret = append(ret, c.oldElements[i]) + } + } + return ret +} + +func (c *StringSlice) GetAddedElements() []string { + ret := make([]string, 0) + for i, flag := range c.newFlags { + if !flag { + ret = append(ret, c.newElements[i]) + } + } + return ret +} From 6dda9ed5a4d2de66ac75fffd35a6193b8b8d344c Mon Sep 17 00:00:00 2001 From: niejiangang Date: Fri, 10 Jun 2022 15:42:12 +0800 Subject: [PATCH 5/7] test: update TestUpdateAndDelayDelete TestCase Signed-off-by: niejiangang --- collector/metadata/kubernetes/pod_watch.go | 2 +- .../metadata/kubernetes/pod_watch_test.go | 42 ++++++++++++++----- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/collector/metadata/kubernetes/pod_watch.go b/collector/metadata/kubernetes/pod_watch.go index 80bf19252..d72b5d617 100644 --- a/collector/metadata/kubernetes/pod_watch.go +++ b/collector/metadata/kubernetes/pod_watch.go @@ -266,7 +266,7 @@ func OnUpdate(objOld interface{}, objNew interface{}) { // Check the containers' ID newContainerIds := make([]string, 0) - for _, containerStatus := range oldPod.Status.ContainerStatuses { + for _, containerStatus := range newPod.Status.ContainerStatuses { shortenContainerId := TruncateContainerId(containerStatus.ContainerID) if shortenContainerId == "" { continue diff --git a/collector/metadata/kubernetes/pod_watch_test.go b/collector/metadata/kubernetes/pod_watch_test.go index 123b8084b..1d6a5fe0f 100644 --- a/collector/metadata/kubernetes/pod_watch_test.go +++ b/collector/metadata/kubernetes/pod_watch_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -195,9 +196,9 @@ func CreatePod(hasPort bool) *corev1.Pod { return pod } -func TestUpdateDelayDelete(t *testing.T) { - addObjJson := "{\"metadata\": {\"name\": \"testdemo2-5c86748464-26crb\",\"namespace\": \"test-ns\",\"resourceVersion\": \"44895976\"},\"spec\": {\"containers\": [{\"name\": \"testdemo2\",\"ports\": [{\"containerPort\": 9001,\"protocol\": \"TCP\"}]}]},\"status\": {\"phase\": \"Running\",\"podIP\": \"192.168.136.210\",\"containerStatuses\": [{\"name\": \"testdemo2\",\"state\": {\"running\": {\"startedAt\": \"2022-05-25T08:55:36Z\"}},\"lastState\": {},\"ready\": true,\"restartCount\": 5,\"image\": \"\",\"imageID\": \"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\": \"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\": true}]}}" - updateObjJson := "{\"metadata\": {\"name\": \"testdemo2-5c86748464-26crb\",\"namespace\": \"test-ns\",\"resourceVersion\": \"47374698\"},\"spec\": {\"containers\": [{\"name\": \"testdemo2\",\"ports\": [{\"containerPort\": 9001,\"protocol\": \"TCP\"}]}]},\"status\": {\"phase\": \"Running\",\"podIP\": \"192.168.136.210\",\"containerStatuses\": [{\"name\": \"testdemo2\",\"state\": {\"terminated\": {\"exitCode\": 143,\"reason\": \"Error\",\"startedAt\": \"2022-05-25T08:55:36Z\",\"finishedAt\": \"2022-06-06T09:04:12Z\",\"containerID\": \"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\"}},\"lastState\": {},\"ready\": false,\"restartCount\": 5,\"image\": \"\",\"imageID\": \"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\": \"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\": false}]}}" +func TestUpdateAndDelayDelete(t *testing.T) { + addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.212\",\"hostIP\":\"10.10.10.102\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" addObj := new(corev1.Pod) err := json.Unmarshal([]byte(addObjJson), addObj) if err != nil { @@ -220,12 +221,33 @@ func TestUpdateDelayDelete(t *testing.T) { stopCh := make(chan struct{}) go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) OnUpdate(addObj, updateObj) - time.Sleep(600 * time.Millisecond) - _, ok = MetaDataCache.GetContainerByIpPort(podIp, uint32(port)) - if !ok { - t.Errorf("Not found container [%s:%d]", podIp, port) - } else { - t.Logf("Found container [%s:%d]", podIp, port) - } + + // Check if new Container can be find + _, find := MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) + assert.True(t, find, "NewContainerId did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port)) + assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port)) + assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") + + // Wait for deletes + time.Sleep(1000 * time.Millisecond) + + // Double Check for NewContainer + _, find = MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) + assert.True(t, find, "NewContainerId did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port)) + assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port)) + assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") + + // Check the old Container has been delete + _, find = MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID)) + assert.False(t, find, "OldContainerId should be deletedin MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(addObj.Status.PodIP, uint32(port)) + assert.False(t, find, "OldContainer IP should be deleted in MetaDataCache") + _, find = MetaDataCache.GetContainerByHostIpPort(addObj.Status.HostIP, uint32(port)) + assert.False(t, find, "OldHostIp Port should be deleted in MetaDataCache") + stopCh <- struct{}{} } From cf489a58223e6815a306ba74bc84faccc1b02314 Mon Sep 17 00:00:00 2001 From: niejiangang Date: Fri, 10 Jun 2022 16:47:13 +0800 Subject: [PATCH 6/7] fix: remove all old IpPortInfo if PodIp changed Signed-off-by: niejiangang --- collector/metadata/kubernetes/pod_watch.go | 20 +++-- .../metadata/kubernetes/pod_watch_test.go | 85 +++++++++++++++++++ 2 files changed, 99 insertions(+), 6 deletions(-) diff --git a/collector/metadata/kubernetes/pod_watch.go b/collector/metadata/kubernetes/pod_watch.go index d72b5d617..1d8d6e27c 100644 --- a/collector/metadata/kubernetes/pod_watch.go +++ b/collector/metadata/kubernetes/pod_watch.go @@ -294,13 +294,21 @@ func OnUpdate(objOld interface{}, objNew interface{}) { } } - portsCompare := compare.NewInt32Slice(oldCachePod.Ports, newPorts) - portsCompare.Compare() - deletedPodInfo.ports = portsCompare.GetRemovedElements() + if oldPod.Status.PodIP != newPod.Status.PodIP { + deletedPodInfo.ports = oldCachePod.Ports + } else { + portsCompare := compare.NewInt32Slice(oldCachePod.Ports, newPorts) + portsCompare.Compare() + deletedPodInfo.ports = portsCompare.GetRemovedElements() + } - hostPortsCompare := compare.NewInt32Slice(oldCachePod.HostPorts, newHostPorts) - hostPortsCompare.Compare() - deletedPodInfo.hostPorts = hostPortsCompare.GetRemovedElements() + if oldPod.Status.HostIP != newPod.Status.HostIP { + deletedPodInfo.hostPorts = oldCachePod.Ports + } else { + hostPortsCompare := compare.NewInt32Slice(oldCachePod.HostPorts, newHostPorts) + hostPortsCompare.Compare() + deletedPodInfo.hostPorts = hostPortsCompare.GetRemovedElements() + } // Wait for a few seconds to remove the cache data podDeleteQueueMut.Lock() diff --git a/collector/metadata/kubernetes/pod_watch_test.go b/collector/metadata/kubernetes/pod_watch_test.go index 1d6a5fe0f..68dd8b5e4 100644 --- a/collector/metadata/kubernetes/pod_watch_test.go +++ b/collector/metadata/kubernetes/pod_watch_test.go @@ -251,3 +251,88 @@ func TestUpdateAndDelayDelete(t *testing.T) { stopCh <- struct{}{} } + +func TestUpdateAndDelayDeleteWhenOnlyPodIpChanged(t *testing.T) { + addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.212\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + addObj := new(corev1.Pod) + json.Unmarshal([]byte(addObjJson), addObj) + updateObj := new(corev1.Pod) + json.Unmarshal([]byte(updateObjJson), updateObj) + port := addObj.Spec.Containers[0].Ports[0].ContainerPort + onAdd(addObj) + stopCh := make(chan struct{}) + go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) + OnUpdate(addObj, updateObj) + + // Check if new Container can be find + _, find := MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) + assert.True(t, find, "NewContainerId did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port)) + assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port)) + assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") + + // Wait for deletes + time.Sleep(1000 * time.Millisecond) + + // Double Check for NewContainer + _, find = MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) + assert.True(t, find, "NewContainerId did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port)) + assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port)) + assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") + + // Check the old Container has been delete + _, find = MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID)) + assert.False(t, find, "OldContainerId should be deletedin MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(addObj.Status.PodIP, uint32(port)) + assert.False(t, find, "OldContainer IP should be deleted in MetaDataCache") + + stopCh <- struct{}{} +} + +func TestUpdateAndDelayDeleteWhenOnlyPortChanged(t *testing.T) { + addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9002,\"protocol\":\"TCP\",\"hostPort\":9002}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}" + addObj := new(corev1.Pod) + json.Unmarshal([]byte(addObjJson), addObj) + updateObj := new(corev1.Pod) + json.Unmarshal([]byte(updateObjJson), updateObj) + port := addObj.Spec.Containers[0].Ports[0].ContainerPort + newPort := updateObj.Spec.Containers[0].Ports[0].ContainerPort + onAdd(addObj) + stopCh := make(chan struct{}) + go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh) + OnUpdate(addObj, updateObj) + + // Check if new Container can be find + _, find := MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) + assert.True(t, find, "NewContainerId did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(newPort)) + assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(newPort)) + assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") + + // Wait for deletes + time.Sleep(1000 * time.Millisecond) + + // Double Check for NewContainer + _, find = MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID)) + assert.True(t, find, "NewContainerId did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(newPort)) + assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache") + _, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(newPort)) + assert.True(t, find, "NewHostIp Port did't find in MetaDataCache") + + // Check the old Container has been delete + _, find = MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID)) + assert.False(t, find, "OldContainerId should be deletedin MetaDataCache") + _, find = MetaDataCache.GetContainerByIpPort(addObj.Status.PodIP, uint32(port)) + assert.True(t, find, "If podIp is not changed , Old IP can still be found in MetaDataCache") + _, find = MetaDataCache.GetContainerByHostIpPort(addObj.Status.HostIP, uint32(port)) + assert.False(t, find, "OldHostIp Port should be deleted in MetaDataCache") + + stopCh <- struct{}{} +} From 7cb2d32c8c9d5111be07cef8f4ed9aa1ae42ce8a Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Mon, 13 Jun 2022 10:07:35 +0800 Subject: [PATCH 7/7] fix typo Signed-off-by: Daxin Wang --- collector/metadata/kubernetes/pod_watch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/metadata/kubernetes/pod_watch.go b/collector/metadata/kubernetes/pod_watch.go index 1d8d6e27c..6d63f9006 100644 --- a/collector/metadata/kubernetes/pod_watch.go +++ b/collector/metadata/kubernetes/pod_watch.go @@ -303,7 +303,7 @@ func OnUpdate(objOld interface{}, objNew interface{}) { } if oldPod.Status.HostIP != newPod.Status.HostIP { - deletedPodInfo.hostPorts = oldCachePod.Ports + deletedPodInfo.hostPorts = oldCachePod.HostPorts } else { hostPortsCompare := compare.NewInt32Slice(oldCachePod.HostPorts, newHostPorts) hostPortsCompare.Compare()