Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Nov 25, 2024
1 parent 7c12ca3 commit 26ff36a
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 54 deletions.
27 changes: 7 additions & 20 deletions pkg/helper/k8smeta/k8s_meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ func getIdxRules(resourceType string) []IdxFunc {
case NODE:
return []IdxFunc{generateNodeKey}
case POD:
return []IdxFunc{generateCommonKey, generatePodIPPortKey, generateContainerIDKey, generateHostIPKey}
return []IdxFunc{generateCommonKey, generatePodIPKey, generateContainerIDKey, generateHostIPKey}
case SERVICE:
return []IdxFunc{generateCommonKey, generateServiceIPPortKey}
return []IdxFunc{generateCommonKey, generateServiceIPKey}
default:
return []IdxFunc{generateCommonKey}
}
Expand Down Expand Up @@ -266,18 +266,12 @@ func generateNameWithNamespaceKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func generatePodIPPortKey(obj interface{}) ([]string, error) {
func generatePodIPKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, fmt.Errorf("object is not a pod")
}
result := make([]string, 0)
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
result = append(result, fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort))
}
}
return result, nil
return []string{pod.Status.PodIP}, nil
}

func generateContainerIDKey(obj interface{}) ([]string, error) {
Expand All @@ -300,31 +294,24 @@ func generateHostIPKey(obj interface{}) ([]string, error) {
return []string{pod.Status.HostIP}, nil
}

func generateServiceIPPortKey(obj interface{}) ([]string, error) {
func generateServiceIPKey(obj interface{}) ([]string, error) {
svc, ok := obj.(*v1.Service)
if !ok {
return []string{}, fmt.Errorf("object is not a service")
}
results := make([]string, 0)
for _, ip := range svc.Spec.ClusterIPs {
if ip != "" {
for _, port := range svc.Spec.Ports {
results = append(results, fmt.Sprintf("%s:%d", ip, port.Port))
}
results = append(results, ip)
}
}
for _, ip := range svc.Spec.ExternalIPs {
if ip != "" {
for _, port := range svc.Spec.Ports {
results = append(results, fmt.Sprintf("%s:%d", ip, port.Port))
}
results = append(results, ip)
}
}
if svc.Spec.LoadBalancerIP != "" {
results = append(results, svc.Spec.LoadBalancerIP)
for _, port := range svc.Spec.Ports {
results = append(results, fmt.Sprintf("%s:%d", svc.Spec.LoadBalancerIP, port.Port))
}
}
return results, nil
}
138 changes: 104 additions & 34 deletions pkg/helper/k8smeta/k8s_meta_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package k8smeta
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
Expand All @@ -21,15 +20,6 @@ type requestBody struct {
Keys []string `json:"keys"`
}

type ipPort struct {
IP string `json:"ip"`
Port int `json:"port"`
}

type ipPortRequestBody struct {
Keys []ipPort `json:"keys"`
}

type metadataHandler struct {
metaManager *MetaManager
}
Expand Down Expand Up @@ -86,7 +76,7 @@ func (m *metadataHandler) handler(handleFunc func(w http.ResponseWriter, r *http

func (m *metadataHandler) handlePodMetaByIPPort(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
var rBody ipPortRequestBody
var rBody requestBody
// Decode the JSON data into the struct
err := json.NewDecoder(r.Body).Decode(&rBody)
if err != nil {
Expand All @@ -97,39 +87,119 @@ func (m *metadataHandler) handlePodMetaByIPPort(w http.ResponseWriter, r *http.R
// Get the metadata
metadata := make(map[string]*PodMetadata)
for _, key := range rBody.Keys {
objs := m.metaManager.cacheMap[POD].Get([]string{fmt.Sprintf("%s:%d", key.IP, key.Port)})
ipPort := strings.Split(key, ":")
if len(ipPort) == 0 {
continue
}
ip := ipPort[0]
objs := m.metaManager.cacheMap[POD].Get([]string{ip})
if len(objs) == 0 {
// try service IP
svcObjs := m.metaManager.cacheMap[SERVICE].Get([]string{fmt.Sprintf("%s:%d", key.IP, key.Port)})
for key, svcObj := range svcObjs {
service, ok := svcObj[0].Raw.(*v1.Service)
if !ok {
continue
}
lm := newLabelMatcher(service, labels.SelectorFromSet(service.Spec.Selector))
podObjs := m.metaManager.cacheMap[POD].Filter(func(ow *ObjectWrapper) bool {
pod := ow.Raw.(*v1.Pod)
if pod.Namespace != service.Namespace {
return false
}
return lm.selector.Matches(labels.Set(pod.Labels))
}, 1)
if len(podObjs) != 0 {
podMetadata := m.convertObj2PodResponse(podObjs[0])
podMetadata.ServiceName = service.Name
metadata[key] = podMetadata
}
podMetadata := m.findPodByServiceIPPort(ipPort)
if podMetadata != nil {
metadata[key] = podMetadata
}
} else {
for key, obj := range objs {
podMetadata := m.convertObj2PodResponse(obj[0])
podMetadata := m.findPodByPodIPPort(ipPort, objs)
if podMetadata != nil {
metadata[key] = podMetadata
}
}
}
wrapperResponse(w, metadata)
}

func (m *metadataHandler) findPodByServiceIPPort(ipPort []string) *PodMetadata {
ip := ipPort[0]
// try service IP
svcObjs := m.metaManager.cacheMap[SERVICE].Get([]string{ip})
if len(svcObjs) == 0 {
return nil
}
var service *v1.Service
if len(ipPort) == 2 {
expectedPort, err := strconv.Atoi(ipPort[1])
if err != nil {
return nil
}
for _, obj := range svcObjs[ip] {
svc, ok := obj.Raw.(*v1.Service)
if !ok {
continue
}
portMatch := false
for _, port := range svc.Spec.Ports {
if port.Port == int32(expectedPort) {
portMatch = true
break
}
}
if !portMatch {
continue
}
service = svc
break
}
} else {
for _, obj := range svcObjs[ip] {
// if no port specified, use the first service
svc, ok := obj.Raw.(*v1.Service)
if !ok {
continue
}
service = svc
break
}
}

// find pod by service
lm := newLabelMatcher(service, labels.SelectorFromSet(service.Spec.Selector))
podObjs := m.metaManager.cacheMap[POD].Filter(func(ow *ObjectWrapper) bool {
pod := ow.Raw.(*v1.Pod)
if pod.Namespace != service.Namespace {
return false
}
return lm.selector.Matches(labels.Set(pod.Labels))
}, 1)
if len(podObjs) != 0 {
podMetadata := m.convertObj2PodResponse(podObjs[0])
podMetadata.ServiceName = service.Name
return podMetadata
}
return nil
}

func (m *metadataHandler) findPodByPodIPPort(ipPort []string, objs map[string][]*ObjectWrapper) *PodMetadata {
ip := ipPort[0]
if len(ipPort) == 2 {
expectedPort, err := strconv.Atoi(ipPort[1])
if err != nil {
return nil
}
for _, obj := range objs[ip] {
pod := obj.Raw.(*v1.Pod)
for _, container := range pod.Spec.Containers {
portMatch := false
for _, port := range container.Ports {
if port.ContainerPort == int32(expectedPort) {
portMatch = true
break
}
}
if !portMatch {
continue
}
podMetadata := m.convertObj2PodResponse(obj)
return podMetadata
}
}
} else {
// without port
podMetadata := m.convertObj2PodResponse(objs[ip][0])
return podMetadata
}
return nil
}

func (m *metadataHandler) convertObj2PodResponse(obj *ObjectWrapper) *PodMetadata {
pod := obj.Raw.(*v1.Pod)
podMetadata := m.getCommonPodMetadata(pod)
Expand Down

0 comments on commit 26ff36a

Please sign in to comment.