Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR - SCTP Multi homing #5

Merged
merged 12 commits into from
May 25, 2023
28 changes: 24 additions & 4 deletions cmd/loxilb-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ package main
import (
"fmt"
"kube-loxilb/pkg/agent/config"
"kube-loxilb/pkg/agent/manager/loadbalancer"
"kube-loxilb/pkg/api"
"kube-loxilb/pkg/ippool"
"kube-loxilb/pkg/k8s"
"kube-loxilb/pkg/log"
"os"
"os/signal"
"strings"
"syscall"
"time"

"kube-loxilb/pkg/agent/manager/loadbalancer"
"kube-loxilb/pkg/k8s"
"kube-loxilb/pkg/log"

"k8s.io/client-go/informers"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -89,6 +89,25 @@ func run(o *Options) error {
return err
}

var sipPools []*ippool.IPPool
if o.config.ExternalSecondaryCIDRs != "" {
CIDRs := strings.Split(o.config.ExternalSecondaryCIDRs, ",")
if len(CIDRs) <= 0 && len(CIDRs) > 4 {
return fmt.Errorf("externalSecondaryCIDR %s config is invalid", o.config.ExternalSecondaryCIDRs)
}

for _, CIDR := range CIDRs {
ipPool, err := ippool.NewIPPool(tk.IpAllocatorNew(), CIDR, !o.config.ExclIPAM)
if err != nil {
klog.Errorf("failed to create external secondary IP Pool (CIDR: %s)", CIDR)
return err
}

networkConfig.ExternalSecondaryCIDRs = append(networkConfig.ExternalSecondaryCIDRs, CIDR)
sipPools = append(sipPools, ipPool)
}
}

loxiAliveCh := make(chan *api.LoxiClient)
var loxilbClients []*api.LoxiClient
for _, lbURL := range networkConfig.LoxilbURLs {
Expand All @@ -104,6 +123,7 @@ func run(o *Options) error {
k8sClient,
loxilbClients,
ipPool,
sipPools,
networkConfig,
informerFactory,
)
Expand Down
2 changes: 2 additions & 0 deletions cmd/loxilb-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type AgentConfig struct {
LoxilbLoadBalancerClass string `yaml:"loxilbLoadBalancerClass,omitempty"`
// support LoadBalancer external IP
ExternalCIDR string `yaml:"externalCIDR,omitempty"`
// support LoadBalancer external secondary IP. This is a comma separated list
ExternalSecondaryCIDRs string `yaml:"ExternalSecondaryCIDRs,omitempty"`
// support BGP protocol
SetBGP bool `yaml:"setBGP,omitempty"`
// loxilb loadbalancer mode
Expand Down
14 changes: 14 additions & 0 deletions cmd/loxilb-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (o *Options) addFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.configFile, "config", o.configFile, "The path to the configuration file")
fs.StringVar(&loxiURLFlag, "loxiURL", loxiURLFlag, "loxilb API server URL(s)")
fs.StringVar(&o.config.ExternalCIDR, "externalCIDR", o.config.ExternalCIDR, "External CIDR Range")
fs.StringVar(&o.config.ExternalSecondaryCIDRs, "externalSecondaryCIDRs", o.config.ExternalCIDR, "External Secondary CIDR Range(s)")
fs.StringVar(&o.config.LoxilbLoadBalancerClass, "loxilbLoadBalancerClass", o.config.LoxilbLoadBalancerClass, "Load-Balancer Class Name")
fs.BoolVar(&o.config.SetBGP, "setBGP", o.config.SetBGP, "Use BGP routing")
fs.BoolVar(&o.config.ExclIPAM, "setUniqueIP", o.config.ExclIPAM, "Use unique IPAM per service")
Expand Down Expand Up @@ -88,6 +89,19 @@ func (o *Options) validate(args []string) error {
}
}

if o.config.ExternalSecondaryCIDRs != "" {
CIDRs := strings.Split(o.config.ExternalSecondaryCIDRs, ",")
if len(CIDRs) <= 0 && len(CIDRs) > 4 {
return fmt.Errorf("externalSecondaryCIDR %s config is invalid", o.config.ExternalSecondaryCIDRs)
}

for _, CIDR := range CIDRs {
if _, _, err := net.ParseCIDR(CIDR); err != nil {
return fmt.Errorf("externalSecondaryCIDR %s config is invalid", CIDR)
}
}
}

if o.config.LoxilbLoadBalancerClass != "" {
if ok := strings.Contains(o.config.LoxilbLoadBalancerClass, "/"); !ok {
return fmt.Errorf("loxilbLoadBalancerClass must be a label-style identifier")
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type NetworkConfig struct {
LoxilbURLs []string
LoxilbLoadBalancerClass string
ExternalCIDR string
ExternalSecondaryCIDRs []string
SetBGP bool
SetLBMode uint16
Monitor bool
Expand Down
100 changes: 90 additions & 10 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import (
"context"
"errors"
"fmt"
"net"
"path"
"reflect"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -38,6 +32,12 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"net"
"path"
"reflect"
"strconv"
"strings"
"time"

"kube-loxilb/pkg/agent/config"
"kube-loxilb/pkg/api"
Expand All @@ -53,6 +53,7 @@ const (
defaultWorkers = 4
LoxiMaxWeight = 10
LoxiMultusServiceAnnotation = "loxilb.io/multus-nets"
numSecIPAnnotation = "loxilb.io/num-secondary-networks"
)

type Manager struct {
Expand All @@ -66,13 +67,15 @@ type Manager struct {
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
ExternalIPPool *ippool.IPPool
ExtSecondaryIPPools []*ippool.IPPool

queue workqueue.RateLimitingInterface
lbCache LbCacheTable
}

type LbCacheEntry struct {
State string
SecIPs []string
LbModelList []api.LoadBalancerModel
}

Expand Down Expand Up @@ -100,6 +103,7 @@ func NewLoadBalancerManager(
kubeClient clientset.Interface,
loxiClients []*api.LoxiClient,
externalIPPool *ippool.IPPool,
externalSecondaryIPPools []*ippool.IPPool,
networkConfig *config.NetworkConfig,
informerFactory informers.SharedInformerFactory) *Manager {

Expand All @@ -109,6 +113,7 @@ func NewLoadBalancerManager(
kubeClient: kubeClient,
loxiClients: loxiClients,
ExternalIPPool: externalIPPool,
ExtSecondaryIPPools: externalSecondaryIPPools,
networkConfig: networkConfig,
serviceInformer: serviceInformer,
serviceLister: serviceInformer.Lister(),
Expand Down Expand Up @@ -234,10 +239,22 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
return nil
}

numSecondarySvc := 0

if strings.Compare(*lbClassName, m.networkConfig.LoxilbLoadBalancerClass) != 0 {
return nil
}

// Check for loxilb specific annotations
if na := svc.Annotations[numSecIPAnnotation]; na != "" {
num, err := strconv.Atoi(na)
if err != nil {
numSecondarySvc = 0
} else {
numSecondarySvc = num
}
}

// Check for loxilb specific annotation
_, needPodEP := svc.Annotations[LoxiMultusServiceAnnotation]
endpointIPs, err := m.getEndpoints(svc, needPodEP)
Expand All @@ -250,7 +267,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
if !added {
//c.lbCache[cacheKey] = make([]api.LoadBalancerModel, 0)
m.lbCache[cacheKey] = &LbCacheEntry{
State: "Added",
State: "Added",
SecIPs: []string{},
}
}

Expand All @@ -262,6 +280,18 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
return err
}

if !added {
ingSecSvcPairs, err := m.getIngressSecSvcPairs(svc, numSecondarySvc)
if err != nil {
return err
}
klog.Infof("Secondary IP Pairs %v", ingSecSvcPairs)

for _, ingSecSvcPair := range ingSecSvcPairs {
m.lbCache[cacheKey].SecIPs = append(m.lbCache[cacheKey].SecIPs, ingSecSvcPair.IPString)
}
}

update := false
if len(m.lbCache[cacheKey].LbModelList) <= 0 {
update = true
Expand Down Expand Up @@ -316,7 +346,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
var errChList []chan error
var lbModelList []api.LoadBalancerModel
for _, port := range svc.Spec.Ports {
lbModel, err := m.makeLoxiLoadBalancerModel(ingSvcPair.IPString, svc, port, endpointIPs, needPodEP)
lbModel, err := m.makeLoxiLoadBalancerModel(ingSvcPair.IPString, m.lbCache[cacheKey].SecIPs, svc, port, endpointIPs, needPodEP)
if err != nil {
return err
}
Expand Down Expand Up @@ -409,6 +439,11 @@ func (m *Manager) deleteLoadBalancer(ns, name string) error {
return fmt.Errorf("failed to delete loxiLB LoadBalancer")
}
m.ExternalIPPool.ReturnIPAddr(lb.Service.ExternalIP, uint32(lb.Service.Port), lb.Service.Protocol)
for idx, ingSecIP := range lbEntry.SecIPs {
if idx < len(m.ExtSecondaryIPPools) {
m.ExtSecondaryIPPools[idx].ReturnIPAddr(ingSecIP, uint32(lb.Service.Port), lb.Service.Protocol)
}
}
}

delete(m.lbCache, cacheKey)
Expand Down Expand Up @@ -544,6 +579,43 @@ func (m *Manager) getIngressSvcPairs(service *corev1.Service) ([]SvcPair, error)
return sPairs, nil
}

// getIngressSecSvcPairs returns a set of secondary IPs
func (m *Manager) getIngressSecSvcPairs(service *corev1.Service, numSecondary int) ([]SvcPair, error) {
var sPairs []SvcPair

if len(m.ExtSecondaryIPPools) < numSecondary {
klog.Errorf("failed to generate external secondary IP. No IP pools")
return sPairs, errors.New("failed to generate external secondary IP. No IP pools")
}

for i := 0; i < numSecondary; i++ {
for _, port := range service.Spec.Ports {
pool := m.ExtSecondaryIPPools[i]
proto := strings.ToLower(string(port.Protocol))
portNum := port.Port
newIP := pool.GetNewIPAddr(uint32(portNum), proto)
if newIP == nil {
// This is a safety code in case the service has the same port.
for _, s := range sPairs {
if s.Port == portNum && s.Protocol == proto {
continue
}
}
for j := 0; j < i; j++ {
rpool := m.ExtSecondaryIPPools[j]
rpool.ReturnIPAddr(sPairs[j].IPString, uint32(portNum), proto)
}
klog.Errorf("failed to generate external secondary IP. IP Pool is full")
return nil, errors.New("failed to generate external secondary IP. IP Pool is full")
}
sp := SvcPair{newIP.String(), portNum, proto}
sPairs = append(sPairs, sp)
}
}

return sPairs, nil
}

func (m *Manager) getLoadBalancerServiceIngressIPs(service *corev1.Service) []string {
var ips []string
for _, ingress := range service.Status.LoadBalancer.Ingress {
Expand All @@ -553,8 +625,9 @@ func (m *Manager) getLoadBalancerServiceIngressIPs(service *corev1.Service) []st
return ips
}

func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, svc *corev1.Service, port corev1.ServicePort, endpointIPs []string, needPodEP bool) (api.LoadBalancerModel, error) {
func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, secIPs []string, svc *corev1.Service, port corev1.ServicePort, endpointIPs []string, needPodEP bool) (api.LoadBalancerModel, error) {
loxiEndpointModelList := []api.LoadBalancerEndpoint{}
loxiSecIPModelList := []api.LoadBalancerSecIp{}

if len(endpointIPs) > 0 {
endpointWeight := uint8(LoxiMaxWeight / len(endpointIPs))
Expand Down Expand Up @@ -584,6 +657,12 @@ func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, svc *corev1.Servi
}
}

if len(secIPs) > 0 {
for _, secIP := range secIPs {
loxiSecIPModelList = append(loxiSecIPModelList, api.LoadBalancerSecIp{SecondaryIP: secIP})
}
}

return api.LoadBalancerModel{
Service: api.LoadBalancerService{
ExternalIP: externalIP,
Expand All @@ -593,7 +672,8 @@ func (m *Manager) makeLoxiLoadBalancerModel(externalIP string, svc *corev1.Servi
Mode: api.LbMode(m.networkConfig.SetLBMode),
Monitor: m.networkConfig.Monitor,
},
Endpoints: loxiEndpointModelList,
SecondaryIPs: loxiSecIPModelList,
Endpoints: loxiEndpointModelList,
}, nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/api/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ func (lbListModel *LoadBalancerListModel) GetKeyStruct() LoxiModel {
}

type LoadBalancerModel struct {
Service LoadBalancerService `json:"serviceArguments"`
Endpoints []LoadBalancerEndpoint `json:"endpoints"`
Service LoadBalancerService `json:"serviceArguments"`
SecondaryIPs []LoadBalancerSecIp `json:"secondaryIPs"`
Endpoints []LoadBalancerEndpoint `json:"endpoints"`
}

func (lbModel *LoadBalancerModel) GetKeyStruct() LoxiModel {
Expand Down Expand Up @@ -47,6 +48,10 @@ type LoadBalancerEndpoint struct {
State string `json:"state"`
}

type LoadBalancerSecIp struct {
SecondaryIP string `json:"secondaryIP"`
}

type LoadBalancerAPI struct {
resource string
provider string
Expand Down