Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR - Revisited TrafficPolicyTypeLocal implementation for incluster mode #93

Merged
merged 1 commit into from
Jan 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions pkg/agent/manager/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type LbCacheEntry struct {
LbMode int
Timeout int
ActCheck bool
PrefLocal bool
Addr string
State string
ProbeType string
Expand Down Expand Up @@ -304,6 +305,10 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
probePort := 0
probeReq := ""
probeResp := ""
prefLocal := false
if svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal {
prefLocal = true
}

if strings.Compare(*lbClassName, m.networkConfig.LoxilbLoadBalancerClass) != 0 {
return nil
Expand Down Expand Up @@ -447,6 +452,7 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
m.lbCache[cacheKey] = &LbCacheEntry{
LbMode: lbMode,
ActCheck: livenessCheck,
PrefLocal: prefLocal,
Timeout: timeout,
State: "Added",
ProbeType: probeType,
Expand Down Expand Up @@ -690,17 +696,8 @@ func (m *Manager) addLoadBalancer(svc *corev1.Service) error {
ch := make(chan error)
go func(c *api.LoxiClient, h chan error) {
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
for _, lb := range lbModelList {
if err = c.LoadBalancer().Create(ctx, &lb.LbModel); err != nil {
if !strings.Contains(err.Error(), "exist") {
klog.Errorf("failed to create load-balancer(%s) :%v", c.Url, err)
break
} else {
err = nil
}
}
err = m.installLB(c, lb, m.lbCache[cacheKey].PrefLocal)
}
h <- err
}(client, ch)
Expand Down Expand Up @@ -831,6 +828,36 @@ func (m *Manager) DeleteAllLoadBalancer() {
m.lbCache = nil
}

func (m *Manager) installLB(c *api.LoxiClient, lb LbModelEnt, prefLocal bool) error {
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Model := lb.LbModel
model := &Model

// Optimization for local Preference
if prefLocal {
model.Endpoints = nil
for _, ep := range lb.LbModel.Endpoints {
if ep.EndpointIP == c.Host {
model.Endpoints = append(model.Endpoints, ep)
}
}
if len(model.Endpoints) <= 0 {
model.Endpoints = lb.LbModel.Endpoints
}
}
if err = c.LoadBalancer().Create(ctx, model); err != nil {
if !strings.Contains(err.Error(), "exist") {
klog.Errorf("failed to create load-balancer(%s) :%v", c.Url, err)
} else {
err = nil
}
}

return err
}

// getEndpoints return LB's endpoints IP list.
// If podEP is true, return multus endpoints list.
// If false, return worker nodes IP list.
Expand Down Expand Up @@ -1136,15 +1163,8 @@ func (m *Manager) makeLoxiLoadBalancerModel(lbArgs *LbArgs, svc *corev1.Service,
lbModeSvc := api.LbMode(m.networkConfig.SetLBMode)

if len(lbArgs.endpointIPs) > 0 {
endpointWeight := uint8(LoxiMaxWeight / len(lbArgs.endpointIPs))
remainderWeight := uint8(LoxiMaxWeight % len(lbArgs.endpointIPs))

for _, endpoint := range lbArgs.endpointIPs {
weight := endpointWeight
if remainderWeight > 0 {
weight++
remainderWeight--
}

tport := uint16(port.NodePort)
if lbArgs.needPodEP {
Expand All @@ -1158,7 +1178,7 @@ func (m *Manager) makeLoxiLoadBalancerModel(lbArgs *LbArgs, svc *corev1.Service,
loxiEndpointModelList = append(loxiEndpointModelList, api.LoadBalancerEndpoint{
EndpointIP: endpoint,
TargetPort: tport,
Weight: weight,
Weight: 1,
})
}
}
Expand Down Expand Up @@ -1592,11 +1612,7 @@ loop:
for _, value := range m.lbCache {
for _, lb := range value.LbModelList {
for retry := 0; retry < 5; retry++ {
err := func(lbModel *api.LoadBalancerModel) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
return aliveClient.LoadBalancer().Create(ctx, lbModel)
}(&lb.LbModel)
err := m.installLB(aliveClient, lb, value.PrefLocal)
if err == nil {
klog.Infof("reinstallLoxiLbRules: lbModel: %v success", lb)
isSuccess = true
Expand Down
Loading