Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to select endpoints based on node labels #145

Merged
merged 1 commit into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 96 additions & 20 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
endPointSelAnnotation = "loxilb.io/epselect"
zoneSelAnnotation = "loxilb.io/zoneselect"
prefLocalPodAnnotation = "loxilb.io/prefLocalPod"
matchNodeLabelAnnotation = "loxilb.io/nodelabel"
MaxExternalSecondaryIPsNum = 4
)

Expand Down Expand Up @@ -132,6 +133,7 @@ type LbCacheEntry struct {
PrefLocal bool
Addr string
State string
NodeLabel string
ProbeType string
ProbePort uint16
ProbeReq string
Expand Down Expand Up @@ -177,6 +179,17 @@ func GenSPKey(IPString string, Port uint16, Protocol string) string {
return fmt.Sprintf("%s:%v:%s", IPString, Port, Protocol)
}

func genExtIPName(ipStr string) string {
prefix := "llb-"
IP := net.ParseIP(ipStr)
if IP != nil {
if IP.IsUnspecified() {
return "llbanyextip"
}
}
return prefix + ipStr
}

// Create and Init Manager.
// Manager is called by kube-loxilb when k8s service is created & updated.
func NewLoadBalancerManager(
Expand Down Expand Up @@ -352,11 +365,17 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
probeRetries := 0
prefLocal := false
epSelect := api.LbSelRr
matchNodeLabel := ""

if strings.Compare(*lbClassName, m.networkConfig.LoxilbLoadBalancerClass) != 0 && !needPodEP {
return nil
}

// Check for loxilb specific annotations - MatchNodeLabel
if mnl := svc.Annotations[matchNodeLabelAnnotation]; mnl != "" {
matchNodeLabel = mnl
}

// Check for loxilb specific annotations - PreferLocalPodAlways
if plp := svc.Annotations[prefLocalPodAnnotation]; plp != "" {
if plp == "yes" {
Expand Down Expand Up @@ -527,7 +546,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
numSecondarySvc = 0
}

endpointIPs, err := m.getEndpoints(svc, needPodEP, addrType)
endpointIPs, err := m.getEndpoints(svc, needPodEP, addrType, matchNodeLabel)
if err != nil {
klog.Errorf("getEndpoints return error.")
klog.V(4).Infof("endpointIPs: %v", endpointIPs)
Expand All @@ -550,6 +569,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
PrefLocal: prefLocal,
Timeout: timeout,
State: "Added",
NodeLabel: matchNodeLabel,
ProbeType: probeType,
ProbePort: uint16(probePort),
ProbeReq: probeReq,
Expand Down Expand Up @@ -636,6 +656,15 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
klog.Infof("%s: addr-type update", cacheKey)
}

if matchNodeLabel != m.lbCache[cacheKey].NodeLabel {
m.lbCache[cacheKey].NodeLabel = matchNodeLabel
update = true
if added {
needDelete = true
}
klog.Infof("%s: nodelabel update", cacheKey)
}

if timeout != m.lbCache[cacheKey].Timeout {
m.lbCache[cacheKey].Timeout = timeout
update = true
Expand Down Expand Up @@ -856,7 +885,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
sp.LbModelList = append(sp.LbModelList, lbModel)
m.lbCache[cacheKey].LbServicePairs[GenSPKey(sp.ExternalIP, sp.Port, sp.Protocol)] = &sp
if ingSvcPair.InRange || ingSvcPair.StaticIP {
retIngress := corev1.LoadBalancerIngress{Hostname: "llb-" + ingSvcPair.IPString}
retIngress := corev1.LoadBalancerIngress{Hostname: genExtIPName(ingSvcPair.IPString)}
if !m.checkServiceIngressIPExists(svc, retIngress.Hostname) {
svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, retIngress)
}
Expand Down Expand Up @@ -1009,24 +1038,65 @@ func (m *Manager) installLB(c *api.LoxiClient, lb api.LoadBalancerModel, prefLoc
return err
}

// getNodeEndpointsWithLabel returns the IP list of nodes available with match labels
func (m *Manager) getNodeEndpointsWithLabel(addrType string, matchLabel string) ([]string, error) {
klog.Infof("getNodeEndpointsWithLabel: label %s", matchLabel)
req, err := labels.NewRequirement(matchLabel, selection.Exists, []string{})
if err != nil {
klog.Infof("getNodeEndpointsWithLabel: failed to make label requirement. err: %v", err)
return nil, err
}

nodes, err := m.nodeLister.List(labels.NewSelector().Add(*req))
if err != nil {
klog.Infof("getNodeEndpointsWithLabel: failed to get nodeList. err: %v", err)
return nil, err
}

var endpoints []string
for _, node := range nodes {
addr, err := m.getNodeAddress(*node, addrType)
if err != nil {
klog.Errorf(err.Error())
continue
}
klog.Infof("getNodeEndpointsWithLabel: found node %s with label %s", addr, matchLabel)
endpoints = append(endpoints, addr)
}

return endpoints, nil
}

// getEndpoints return LB's endpoints IP list.
// If podEP is true, return multus endpoints list.
// If false, return worker nodes IP list.
func (m *Manager) getEndpoints(svc *corev1.Service, podEP bool, addrType string) ([]string, error) {
func (m *Manager) getEndpoints(svc *corev1.Service, podEP bool, addrType, matchNodeLabel string) ([]string, error) {
if podEP {
//klog.Infof("getEndpoints: Pod end-points")
return m.getMultusEndpoints(svc, addrType)
}

var matchNodeList []string
var err error
if matchNodeLabel != "" {
matchNodeList, err = m.getNodeEndpointsWithLabel(addrType, matchNodeLabel)
if err != nil {
return nil, err
}
if len(matchNodeList) <= 0 {
matchNodeList = append(matchNodeList, "xdeadbeefx")
}
}

if svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal {
//klog.Infof("getEndpoints: Traffic Policy Local")
return k8s.GetServiceLocalEndpoints(m.kubeClient, svc, addrType)
//klog.Infof("getEndpoints: Traffic Policy Local %d", len(matchNodeList))
return k8s.GetServiceLocalEndpoints(m.kubeClient, svc, addrType, matchNodeList)
}
return m.getNodeEndpoints(addrType)
return m.getNodeEndpoints(addrType, matchNodeList)
}

// getNodeEndpoints returns the IP list of nodes available as nodePort service.
func (m *Manager) getNodeEndpoints(addrType string) ([]string, error) {
func (m *Manager) getNodeEndpoints(addrType string, nodeMatchList []string) ([]string, error) {
req, err := labels.NewRequirement("node.kubernetes.io/exclude-from-external-load-balancers", selection.DoesNotExist, []string{})
if err != nil {
klog.Infof("getEndpoints: failed to make label requirement. err: %v", err)
Expand All @@ -1039,7 +1109,7 @@ func (m *Manager) getNodeEndpoints(addrType string) ([]string, error) {
return nil, err
}

return m.getEndpointsForLB(nodes, addrType), nil
return m.getEndpointsForLB(nodes, addrType, nodeMatchList), nil
}

// getLocalEndpoints returns the IP list of the Pods connected to the multus network.
Expand Down Expand Up @@ -1093,14 +1163,17 @@ func (m *Manager) getNodeAddress(node corev1.Node, addrType string) (string, err
return "", errors.New("no address with family found for host")
}

func (m *Manager) getEndpointsForLB(nodes []*corev1.Node, addrType string) []string {
func (m *Manager) getEndpointsForLB(nodes []*corev1.Node, addrType string, nodeMatchList []string) []string {
var endpoints []string
for _, node := range nodes {
addr, err := m.getNodeAddress(*node, addrType)
if err != nil {
klog.Errorf(err.Error())
continue
}
if len(nodeMatchList) > 0 && !k8s.MatchNodeinNodeList(addr, nodeMatchList) {
continue
}
endpoints = append(endpoints, addr)
}

Expand All @@ -1110,7 +1183,7 @@ func (m *Manager) getEndpointsForLB(nodes []*corev1.Node, addrType string) []str
func (m *Manager) checkUpdateExternalIP(ingSvcPairs []SvcPair, svc *corev1.Service) bool {
for _, ingSvcPair := range ingSvcPairs {
if ingSvcPair.InRange || ingSvcPair.StaticIP {
retIngress := corev1.LoadBalancerIngress{Hostname: "llb-" + ingSvcPair.IPString}
retIngress := corev1.LoadBalancerIngress{Hostname: genExtIPName(ingSvcPair.IPString)}
if !m.checkServiceIngressIPExists(svc, retIngress.Hostname) {
klog.V(4).Infof("checkUpdateExternalIP: ingSvcPair %v has external IP but service %s has no IP. need update.", ingSvcPair, svc.Name)
return true
Expand Down Expand Up @@ -1185,19 +1258,22 @@ func (m *Manager) getServiceIngressIPs(service *corev1.Service) []string {
ingressIP = ingress.IP

} else if ingress.Hostname != "" {
llbHost := strings.Split(ingress.Hostname, "-")

if len(llbHost) != 2 {
if net.ParseIP(llbHost[0]) != nil {
ingressIP = llbHost[0]

}
if ingress.Hostname == "llbanyextip" {
ingressIP = "0.0.0.0"
} else {
if llbHost[0] == "llb" {
if net.ParseIP(llbHost[1]) != nil {
ingressIP = llbHost[1]
llbHost := strings.Split(ingress.Hostname, "-")

if len(llbHost) != 2 {
if net.ParseIP(llbHost[0]) != nil {
ingressIP = llbHost[0]

}
} else {
if llbHost[0] == "llb" {
if net.ParseIP(llbHost[1]) != nil {
ingressIP = llbHost[1]
}
}
}
}
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ import (
"context"
"errors"
"fmt"
"net"
"time"

tk "github.com/loxilb-io/loxilib"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
"net"
"time"
)

// GetNodeAddr gets the available IP address of a Node.
Expand All @@ -53,8 +52,17 @@ func GetNodeAddr(node *v1.Node) (net.IP, error) {
return ipAddr, nil
}

func MatchNodeinNodeList(node string, nodeMatchList []string) bool {
for _, n := range nodeMatchList {
if n == node {
return true
}
}
return false
}

// GetServiceLocalEndpoints - Get HostIPs of pods belonging to the given service
func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *corev1.Service, addrType string) ([]string, error) {
func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *corev1.Service, addrType string, nodeMatchList []string) ([]string, error) {
var epList []string

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand All @@ -72,6 +80,9 @@ func GetServiceLocalEndpoints(kubeClient clientset.Interface, svc *corev1.Servic
if addrType == "ipv6" && !tk.IsNetIPv6(pod.Status.HostIP) {
continue
}
if len(nodeMatchList) > 0 && !MatchNodeinNodeList(pod.Status.HostIP, nodeMatchList) {
continue
}
if _, found := epMap[pod.Status.HostIP]; !found {
epMap[pod.Status.HostIP] = struct{}{}
epList = append(epList, pod.Status.HostIP)
Expand Down
Loading