From a85bf889c546da1ad16ad8d3cd655db9b0cba025 Mon Sep 17 00:00:00 2001 From: Milan Lenco Date: Wed, 2 Oct 2024 18:49:15 +0200 Subject: [PATCH] Make sure that K3s service prefix is routed via cluster interface Ensure that packets destined for K3s services do not use the default route, but are instead routed through the cluster port. This guarantees that traffic handled by kube-proxy is properly SNATed to the cluster IP. That's the theory at least. We're not entirely certain. Without this route, however, some Longhorn pods fail to access K3s services when the cluster IP is configured on a non-default port. Signed-off-by: Milan Lenco --- pkg/pillar/devicenetwork/pbr.go | 4 ++ pkg/pillar/dpcreconciler/linux.go | 68 ++++++++++++++++++++++++++----- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/pkg/pillar/devicenetwork/pbr.go b/pkg/pillar/devicenetwork/pbr.go index 541d48dca7..50308c2158 100644 --- a/pkg/pillar/devicenetwork/pbr.go +++ b/pkg/pillar/devicenetwork/pbr.go @@ -4,6 +4,10 @@ package devicenetwork const ( + // KubeSvcRT : index of the routing table used for the Kubernetes service prefix. + // Only used in the Kubevirt mode. + KubeSvcRT = 400 + // DPCBaseRTIndex : base index for per-port routing tables used for device // connectivity (between EVE and remote endpoints, such as the controller), // i.e. used for DevicePortConfig (abbreviated to DPC). diff --git a/pkg/pillar/dpcreconciler/linux.go b/pkg/pillar/dpcreconciler/linux.go index f6c92c8261..a147ab5a2f 100644 --- a/pkg/pillar/dpcreconciler/linux.go +++ b/pkg/pillar/dpcreconciler/linux.go @@ -304,7 +304,8 @@ func (r *LinuxDpcReconciler) watcher(netEvents <-chan netmonitor.Event) { } } if ev.Deleted { - changed := r.updateCurrentRoutes(r.lastArgs.DPC) + changed := r.updateCurrentRoutes(r.lastArgs.DPC, + r.lastArgs.ClusterStatus) if changed { r.addPendingReconcile( L3SG, "interface delete triggered route change", true) @@ -315,13 +316,14 @@ func (r *LinuxDpcReconciler) watcher(netEvents <-chan netmonitor.Event) { if changed { r.addPendingReconcile(L3SG, "address change", true) } - changed = r.updateCurrentRoutes(r.lastArgs.DPC) + changed = r.updateCurrentRoutes(r.lastArgs.DPC, r.lastArgs.ClusterStatus) if changed { r.addPendingReconcile(L3SG, "address change triggered route change", true) } case netmonitor.DNSInfoChange: - newGlobalCfg := r.getIntendedGlobalCfg(r.lastArgs.DPC) + newGlobalCfg := r.getIntendedGlobalCfg(r.lastArgs.DPC, + r.lastArgs.ClusterStatus) prevGlobalCfg := r.intendedState.SubGraph(GlobalSG) if len(prevGlobalCfg.DiffItems(newGlobalCfg)) > 0 { r.addPendingReconcile(GlobalSG, "DNS info change", true) @@ -449,7 +451,7 @@ func (r *LinuxDpcReconciler) Reconcile(ctx context.Context, args Args) Reconcile var intSG dg.Graph switch reconcileSG { case GlobalSG: - intSG = r.getIntendedGlobalCfg(args.DPC) + intSG = r.getIntendedGlobalCfg(args.DPC, args.ClusterStatus) case NetworkIoSG: intSG = r.getIntendedNetworkIO(args.DPC) case PhysicalIfsSG: @@ -703,7 +705,7 @@ func (r *LinuxDpcReconciler) updateCurrentState(args Args) (changed bool) { if addrsChanged := r.updateCurrentAdapterAddrs(args.DPC); addrsChanged { changed = true } - if routesChanged := r.updateCurrentRoutes(args.DPC); routesChanged { + if routesChanged := r.updateCurrentRoutes(args.DPC, args.ClusterStatus); routesChanged { changed = true } return changed @@ -792,7 +794,8 @@ func (r *LinuxDpcReconciler) updateCurrentAdapterAddrs( return false } -func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig) (changed bool) { +func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) (changed bool) { sgPath := dg.NewSubGraphPath(L3SG, RoutesSG) currentRoutes := dg.New(dg.InitArgs{Name: RoutesSG}) for _, port := range dpc.Ports { @@ -819,6 +822,20 @@ func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig) (ch r.Log.Errorf("updateCurrentRoutes: ListRoutes failed for ifIndex %d: %v", ifIndex, err) } + if r.HVTypeKube && clusterStatus.ClusterInterface == port.Logicallabel { + k3sSvcRoutes, err := r.NetworkMonitor.ListRoutes(netmonitor.RouteFilters{ + FilterByTable: true, + Table: devicenetwork.KubeSvcRT, + FilterByIf: true, + IfIndex: ifIndex, + }) + if err == nil { + routes = append(routes, k3sSvcRoutes...) + } else { + r.Log.Errorf("updateCurrentRoutes: ListRoutes failed for ifIndex %d "+ + "and the KubeSvc table: %v", ifIndex, err) + } + } for _, rt := range routes { currentRoutes.PutItem(linux.Route{ Route: rt.Data.(netlink.Route), @@ -844,7 +861,7 @@ func (r *LinuxDpcReconciler) updateIntendedState(args Args) { Description: "Device Connectivity provided using Linux network stack", } r.intendedState = dg.New(graphArgs) - r.intendedState.PutSubGraph(r.getIntendedGlobalCfg(args.DPC)) + r.intendedState.PutSubGraph(r.getIntendedGlobalCfg(args.DPC, args.ClusterStatus)) r.intendedState.PutSubGraph(r.getIntendedNetworkIO(args.DPC)) r.intendedState.PutSubGraph(r.getIntendedPhysicalIfs(args.DPC)) r.intendedState.PutSubGraph(r.getIntendedLogicalIO(args.DPC)) @@ -854,7 +871,8 @@ func (r *LinuxDpcReconciler) updateIntendedState(args Args) { args.FlowlogEnabled)) } -func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig) dg.Graph { +func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) dg.Graph { graphArgs := dg.InitArgs{ Name: GlobalSG, Description: "Global configuration", @@ -871,10 +889,14 @@ func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig) dg Priority: devicenetwork.PbrKubeNetworkPrio, Table: unix.RT_TABLE_MAIN, }, nil) + tableForKubeSvc := unix.RT_TABLE_MAIN + if clusterStatus.ClusterInterface != "" { + tableForKubeSvc = devicenetwork.KubeSvcRT + } intendedCfg.PutItem(linux.IPRule{ Dst: kubeSvcCIDR, Priority: devicenetwork.PbrKubeNetworkPrio, - Table: unix.RT_TABLE_MAIN, + Table: tableForKubeSvc, }, nil) } if len(dpc.Ports) == 0 { @@ -1076,7 +1098,7 @@ func (r *LinuxDpcReconciler) getIntendedL3Cfg(dpc types.DevicePortConfig, intendedL3 := dg.New(graphArgs) intendedL3.PutSubGraph(r.getIntendedAdapters(dpc, clusterStatus)) intendedL3.PutSubGraph(r.getIntendedSrcIPRules(dpc)) - intendedL3.PutSubGraph(r.getIntendedRoutes(dpc)) + intendedL3.PutSubGraph(r.getIntendedRoutes(dpc, clusterStatus)) intendedL3.PutSubGraph(r.getIntendedArps(dpc)) return intendedL3 } @@ -1176,7 +1198,8 @@ func (r *LinuxDpcReconciler) getIntendedSrcIPRules(dpc types.DevicePortConfig) d return intendedRules } -func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig) dg.Graph { +func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) dg.Graph { graphArgs := dg.InitArgs{ Name: RoutesSG, Description: "IP routes", @@ -1218,6 +1241,29 @@ func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig) dg.Gr AdapterLL: port.Logicallabel, }, nil) } + if r.HVTypeKube && clusterStatus.ClusterInterface == port.Logicallabel && + clusterStatus.ClusterIPPrefix != nil { + // Ensure that packets destined for K3s services do not use the default route, + // but are instead routed through the cluster port. This guarantees that traffic + // handled by kube-proxy is properly SNATed to the cluster IP. That's the theory + // at least. We're not entirely certain. Without this route, however, + // some Longhorn pods fail to access K3s services when the cluster IP is configured + // on a non-default port. + intendedRoutes.PutItem(linux.Route{ + Route: netlink.Route{ + LinkIndex: ifIndex, + Family: netlink.FAMILY_V4, + Scope: netlink.SCOPE_UNIVERSE, + Protocol: unix.RTPROT_STATIC, + Type: unix.RTN_UNICAST, + Dst: kubeSvcCIDR, + Gw: clusterStatus.ClusterIPPrefix.IP, + Table: devicenetwork.KubeSvcRT, + }, + AdapterIfName: port.IfName, + AdapterLL: port.Logicallabel, + }, nil) + } } return intendedRoutes }