Skip to content

Commit

Permalink
fix onUpdate bug
Browse files Browse the repository at this point in the history
Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>
  • Loading branch information
Daxin Wang committed Jun 9, 2022
1 parent 18547da commit aca2955
Showing 5 changed files with 226 additions and 31 deletions.
1 change: 1 addition & 0 deletions collector/metadata/kubernetes/k8scache.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ type K8sPodInfo struct {
Ip string
PodName string
Ports []int32
HostPorts []int32
ContainerIds []string
Labels map[string]string
// TODO: There may be multiple kinds of workload or services for the same pod
129 changes: 98 additions & 31 deletions collector/metadata/kubernetes/pod_watch.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/compare"
corev1 "k8s.io/api/core/v1"
_ "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
@@ -25,7 +26,6 @@ type podMap struct {
}

var globalPodInfo = newPodMap()
var podUpdateMutex sync.Mutex

func newPodMap() *podMap {
return &podMap{
@@ -119,32 +119,8 @@ func onAdd(obj interface{}) {
pod := obj.(*corev1.Pod)

// Find the controller workload of the pod
var (
workloadTypeTmp string
workloadNameTmp string
)
rsUpdateMutex.RLock()
for _, owner := range pod.OwnerReferences {
// only care about the controller
if owner.Controller == nil || *owner.Controller != true {
continue
}
// TODO: recursion method to find the controller
if owner.Kind == ReplicaSetKind {
// The owner of Pod is ReplicaSet, and it is Workload such as Deployment for ReplicaSet.
// Therefore, find ReplicaSet's name in 'globalRsInfo' to find which kind of workload
// the Pod belongs to.
if workload, ok := globalRsInfo.GetOwnerReference(mapKey(pod.Namespace, owner.Name)); ok {
workloadTypeTmp = CompleteGVK(workload.APIVersion, strings.ToLower(workload.Kind))
workloadNameTmp = workload.Name
break
}
}
// If the owner of pod is not ReplicaSet or the replicaset has no controller
workloadTypeTmp = CompleteGVK(owner.APIVersion, strings.ToLower(owner.Kind))
workloadNameTmp = owner.Name
break
}
workloadTypeTmp, workloadNameTmp := getControllerKindName(pod)
rsUpdateMutex.RUnlock()

// Find one of the services of the pod
@@ -168,7 +144,8 @@ func onAdd(obj interface{}) {
Ip: pod.Status.PodIP,
Namespace: pod.Namespace,
PodName: pod.Name,
Ports: make([]int32, 0, 1),
Ports: make([]int32, 0),
HostPorts: make([]int32, 0),
ContainerIds: make([]string, 0, 2),
Labels: pod.Labels,
WorkloadKind: workloadTypeTmp,
@@ -213,13 +190,15 @@ func onAdd(obj interface{}) {
// When there are many containers in one pod and only part of them have ports,
// the containers at the back will overwrite the ones at the front here.
MetaDataCache.AddContainerByIpPort(pod.Status.PodIP, 0, containerInfo)
cachePodInfo.Ports = append(cachePodInfo.Ports, 0)
continue
}
for _, port := range tmpContainer.Ports {
cachePodInfo.Ports = append(cachePodInfo.Ports, port.ContainerPort)
// If hostPort is specified, add the container using HostIP and HostPort
if port.HostPort != 0 {
containerInfo.HostPortMap[port.HostPort] = port.ContainerPort
cachePodInfo.HostPorts = append(cachePodInfo.HostPorts, port.HostPort)
MetaDataCache.AddContainerByHostIpPort(pod.Status.HostIP, uint32(port.HostPort), containerInfo)
}
MetaDataCache.AddContainerByIpPort(pod.Status.PodIP, uint32(port.ContainerPort), containerInfo)
@@ -229,6 +208,30 @@ func onAdd(obj interface{}) {
globalPodInfo.add(cachePodInfo)
}

func getControllerKindName(pod *corev1.Pod) (workloadKind string, workloadName string) {
for _, owner := range pod.OwnerReferences {
// only care about the controller
if owner.Controller == nil || *owner.Controller != true {
continue
}
if owner.Kind == ReplicaSetKind {
// The owner of Pod is ReplicaSet, and it is Workload such as Deployment for ReplicaSet.
// Therefore, find ReplicaSet's name in 'globalRsInfo' to find which kind of workload
// the Pod belongs to.
if workload, ok := globalRsInfo.GetOwnerReference(mapKey(pod.Namespace, owner.Name)); ok {
workloadKind = CompleteGVK(workload.APIVersion, strings.ToLower(workload.Kind))
workloadName = workload.Name
return
}
}
// If the owner of pod is not ReplicaSet or the replicaset has no controller
workloadKind = CompleteGVK(owner.APIVersion, strings.ToLower(owner.Kind))
workloadName = owner.Name
return
}
return
}

func OnUpdate(objOld interface{}, objNew interface{}) {
oldPod := objOld.(*corev1.Pod)
newPod := objNew.(*corev1.Pod)
@@ -237,11 +240,75 @@ func OnUpdate(objOld interface{}, objNew interface{}) {
// Two different versions of the same pod will always have different RVs.
return
}
podUpdateMutex.Lock()
// TODO: re-implement the updated logic to reduce computation
onDelete(objOld)

oldCachePod, ok := globalPodInfo.get(oldPod.Namespace, oldPod.Name)
if !ok {
onAdd(objNew)
return
}
// Always override the old pod in the cache
onAdd(objNew)
podUpdateMutex.Unlock()

// Delay delete the pod using the difference between the old pod and the new one
deletedPodInfo := &deletedPodInfo{
name: "",
namespace: oldPod.Namespace,
containerIds: nil,
ip: oldPod.Status.PodIP,
ports: nil,
hostIp: oldPod.Status.HostIP,
hostPorts: nil,
}

if oldPod.Name != newPod.Name {
deletedPodInfo.name = oldPod.Name
}

// Check the containers' ID
newContainerIds := make([]string, 0)
for _, containerStatus := range oldPod.Status.ContainerStatuses {
shortenContainerId := TruncateContainerId(containerStatus.ContainerID)
if shortenContainerId == "" {
continue
}
newContainerIds = append(newContainerIds, shortenContainerId)
}
containerIdCompare := compare.NewStringSlice(oldCachePod.ContainerIds, newContainerIds)
containerIdCompare.Compare()
deletedPodInfo.containerIds = containerIdCompare.GetRemovedElements()

// Check the ports specified.
newPorts := make([]int32, 0)
newHostPorts := make([]int32, 0)
for _, container := range newPod.Spec.Containers {
if len(container.Ports) == 0 {
newPorts = append(newPorts, 0)
continue
}
for _, port := range container.Ports {
newPorts = append(newPorts, port.ContainerPort)
// If hostPort is specified, add the container using HostIP and HostPort
if port.HostPort != 0 {
newHostPorts = append(newHostPorts, port.HostPort)
}
}
}

portsCompare := compare.NewInt32Slice(oldCachePod.Ports, newPorts)
portsCompare.Compare()
deletedPodInfo.ports = portsCompare.GetRemovedElements()

hostPortsCompare := compare.NewInt32Slice(oldCachePod.HostPorts, newHostPorts)
hostPortsCompare.Compare()
deletedPodInfo.hostPorts = hostPortsCompare.GetRemovedElements()

// Wait for a few seconds to remove the cache data
podDeleteQueueMut.Lock()
podDeleteQueue = append(podDeleteQueue, deleteRequest{
podInfo: deletedPodInfo,
ts: time.Now(),
})
podDeleteQueueMut.Unlock()
}

func onDelete(obj interface{}) {
49 changes: 49 additions & 0 deletions collector/pkg/compare/int32_slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package compare

type Int32Slice struct {
oldElements []int32
oldFlags []bool
newElements []int32
newFlags []bool
}

func NewInt32Slice(oldElements []int32, newElements []int32) Int32Slice {
return Int32Slice{
oldElements: oldElements,
oldFlags: make([]bool, len(oldElements)),
newElements: newElements,
newFlags: make([]bool, len(newElements)),
}
}

func (c *Int32Slice) Compare() {
// The elements could be repeated, so we must iterate all the elements.
for i, newElement := range c.newElements {
for j, oldElement := range c.oldElements {
if oldElement == newElement {
c.newFlags[i] = true
c.oldFlags[j] = true
}
}
}
}

func (c *Int32Slice) GetRemovedElements() []int32 {
ret := make([]int32, 0)
for i, flag := range c.oldFlags {
if !flag {
ret = append(ret, c.oldElements[i])
}
}
return ret
}

func (c *Int32Slice) GetAddedElements() []int32 {
ret := make([]int32, 0)
for i, flag := range c.newFlags {
if !flag {
ret = append(ret, c.newElements[i])
}
}
return ret
}
29 changes: 29 additions & 0 deletions collector/pkg/compare/slice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package compare

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestInt32Slice(t *testing.T) {
oldElements := []int32{0, 0, 1, 2, 3}
newElements := []int32{1, 3, 4, 6}
compareSlice := NewInt32Slice(oldElements, newElements)
compareSlice.Compare()
removedElements := compareSlice.GetRemovedElements()
assert.ElementsMatch(t, removedElements, []int32{0, 0, 2})
addedElements := compareSlice.GetAddedElements()
assert.ElementsMatch(t, addedElements, []int32{4, 6})
}

func TestStringSlice(t *testing.T) {
oldElements := []string{"a", "b", "c"}
newElements := []string{"d", "e", "f"}
compareSlice := NewStringSlice(oldElements, newElements)
compareSlice.Compare()
removedElements := compareSlice.GetRemovedElements()
assert.ElementsMatch(t, removedElements, []string{"a", "b", "c"})
addedElements := compareSlice.GetAddedElements()
assert.ElementsMatch(t, addedElements, []string{"d", "e", "f"})
}
49 changes: 49 additions & 0 deletions collector/pkg/compare/string_slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package compare

type StringSlice struct {
oldElements []string
oldFlags []bool
newElements []string
newFlags []bool
}

func NewStringSlice(oldElements []string, newElements []string) StringSlice {
return StringSlice{
oldElements: oldElements,
oldFlags: make([]bool, len(oldElements)),
newElements: newElements,
newFlags: make([]bool, len(newElements)),
}
}

func (c *StringSlice) Compare() {
// The elements could be repeated, so we must iterate all the elements.
for i, newElement := range c.newElements {
for j, oldElement := range c.oldElements {
if oldElement == newElement {
c.newFlags[i] = true
c.oldFlags[j] = true
}
}
}
}

func (c *StringSlice) GetRemovedElements() []string {
ret := make([]string, 0)
for i, flag := range c.oldFlags {
if !flag {
ret = append(ret, c.oldElements[i])
}
}
return ret
}

func (c *StringSlice) GetAddedElements() []string {
ret := make([]string, 0)
for i, flag := range c.newFlags {
if !flag {
ret = append(ret, c.newElements[i])
}
}
return ret
}

0 comments on commit aca2955

Please sign in to comment.