Skip to content

Commit

Permalink
fix: remove all old IpPortInfo if PodIp changed
Browse files Browse the repository at this point in the history
Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>
  • Loading branch information
NeJan2020 committed Jun 10, 2022
1 parent 6dda9ed commit cf489a5
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 6 deletions.
20 changes: 14 additions & 6 deletions collector/metadata/kubernetes/pod_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,21 @@ func OnUpdate(objOld interface{}, objNew interface{}) {
}
}

portsCompare := compare.NewInt32Slice(oldCachePod.Ports, newPorts)
portsCompare.Compare()
deletedPodInfo.ports = portsCompare.GetRemovedElements()
if oldPod.Status.PodIP != newPod.Status.PodIP {
deletedPodInfo.ports = oldCachePod.Ports
} else {
portsCompare := compare.NewInt32Slice(oldCachePod.Ports, newPorts)
portsCompare.Compare()
deletedPodInfo.ports = portsCompare.GetRemovedElements()
}

hostPortsCompare := compare.NewInt32Slice(oldCachePod.HostPorts, newHostPorts)
hostPortsCompare.Compare()
deletedPodInfo.hostPorts = hostPortsCompare.GetRemovedElements()
if oldPod.Status.HostIP != newPod.Status.HostIP {
deletedPodInfo.hostPorts = oldCachePod.Ports
} else {
hostPortsCompare := compare.NewInt32Slice(oldCachePod.HostPorts, newHostPorts)
hostPortsCompare.Compare()
deletedPodInfo.hostPorts = hostPortsCompare.GetRemovedElements()
}

// Wait for a few seconds to remove the cache data
podDeleteQueueMut.Lock()
Expand Down
85 changes: 85 additions & 0 deletions collector/metadata/kubernetes/pod_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,88 @@ func TestUpdateAndDelayDelete(t *testing.T) {

stopCh <- struct{}{}
}

func TestUpdateAndDelayDeleteWhenOnlyPodIpChanged(t *testing.T) {
addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}"
updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.212\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}"
addObj := new(corev1.Pod)
json.Unmarshal([]byte(addObjJson), addObj)
updateObj := new(corev1.Pod)
json.Unmarshal([]byte(updateObjJson), updateObj)
port := addObj.Spec.Containers[0].Ports[0].ContainerPort
onAdd(addObj)
stopCh := make(chan struct{})
go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh)
OnUpdate(addObj, updateObj)

// Check if new Container can be find
_, find := MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID))
assert.True(t, find, "NewContainerId did't find in MetaDataCache")
_, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port))
assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache")
_, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port))
assert.True(t, find, "NewHostIp Port did't find in MetaDataCache")

// Wait for deletes
time.Sleep(1000 * time.Millisecond)

// Double Check for NewContainer
_, find = MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID))
assert.True(t, find, "NewContainerId did't find in MetaDataCache")
_, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(port))
assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache")
_, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(port))
assert.True(t, find, "NewHostIp Port did't find in MetaDataCache")

// Check the old Container has been delete
_, find = MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID))
assert.False(t, find, "OldContainerId should be deletedin MetaDataCache")
_, find = MetaDataCache.GetContainerByIpPort(addObj.Status.PodIP, uint32(port))
assert.False(t, find, "OldContainer IP should be deleted in MetaDataCache")

stopCh <- struct{}{}
}

func TestUpdateAndDelayDeleteWhenOnlyPortChanged(t *testing.T) {
addObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895976\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9001,\"protocol\":\"TCP\",\"hostPort\":9001}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d505f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}"
updateObjJson := "{\"metadata\":{\"name\":\"testdemo2-5c86748464-26crb\",\"namespace\":\"test-ns\",\"resourceVersion\":\"44895977\"},\"spec\":{\"containers\":[{\"name\":\"testdemo2\",\"ports\":[{\"containerPort\":9002,\"protocol\":\"TCP\",\"hostPort\":9002}]}]},\"status\":{\"phase\":\"Running\",\"podIP\":\"192.168.136.210\",\"hostIP\":\"10.10.10.101\",\"containerStatuses\":[{\"name\":\"testdemo2\",\"state\":{\"running\":{\"startedAt\":\"2022-05-25T08:55:36Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":5,\"image\":\"\",\"imageID\":\"docker-pullable://10.10.102.213:8443/cloudnevro-test/test-netserver@sha256:6720f648b74ed590f36094a1c7a58b01b6881396409784c17f471ecfe445e3fd\",\"containerID\":\"docker://d000f50edb4e204cf31840e3cb8d26d33f212d4ebef994d0c3fc151d57e17413\",\"started\":true}]}}"
addObj := new(corev1.Pod)
json.Unmarshal([]byte(addObjJson), addObj)
updateObj := new(corev1.Pod)
json.Unmarshal([]byte(updateObjJson), updateObj)
port := addObj.Spec.Containers[0].Ports[0].ContainerPort
newPort := updateObj.Spec.Containers[0].Ports[0].ContainerPort
onAdd(addObj)
stopCh := make(chan struct{})
go podDeleteLoop(100*time.Millisecond, 500*time.Millisecond, stopCh)
OnUpdate(addObj, updateObj)

// Check if new Container can be find
_, find := MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID))
assert.True(t, find, "NewContainerId did't find in MetaDataCache")
_, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(newPort))
assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache")
_, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(newPort))
assert.True(t, find, "NewHostIp Port did't find in MetaDataCache")

// Wait for deletes
time.Sleep(1000 * time.Millisecond)

// Double Check for NewContainer
_, find = MetaDataCache.GetByContainerId(TruncateContainerId(updateObj.Status.ContainerStatuses[0].ContainerID))
assert.True(t, find, "NewContainerId did't find in MetaDataCache")
_, find = MetaDataCache.GetContainerByIpPort(updateObj.Status.PodIP, uint32(newPort))
assert.True(t, find, "NewContainer IP Port did't find in MetaDataCache")
_, find = MetaDataCache.GetContainerByHostIpPort(updateObj.Status.HostIP, uint32(newPort))
assert.True(t, find, "NewHostIp Port did't find in MetaDataCache")

// Check the old Container has been delete
_, find = MetaDataCache.GetByContainerId(TruncateContainerId(addObj.Status.ContainerStatuses[0].ContainerID))
assert.False(t, find, "OldContainerId should be deletedin MetaDataCache")
_, find = MetaDataCache.GetContainerByIpPort(addObj.Status.PodIP, uint32(port))
assert.True(t, find, "If podIp is not changed , Old IP can still be found in MetaDataCache")
_, find = MetaDataCache.GetContainerByHostIpPort(addObj.Status.HostIP, uint32(port))
assert.False(t, find, "OldHostIp Port should be deleted in MetaDataCache")

stopCh <- struct{}{}
}

0 comments on commit cf489a5

Please sign in to comment.