diff --git a/.golangci.yml b/.golangci.yml index 6ce5d2f82..36ec83145 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -5,6 +5,7 @@ linters: - bodyclose - depguard - dogsled + - dupl - errcheck - exportloopref - gofmt diff --git a/pkg/controllers/netpol/policy.go b/pkg/controllers/netpol/policy.go index 19d1f153b..57836a39a 100644 --- a/pkg/controllers/netpol/policy.go +++ b/pkg/controllers/netpol/policy.go @@ -106,19 +106,15 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo activePolicyChains[policyChainName] = true - currnetPodIps := make([]string, 0, len(policy.targetPods)) + currentPodIPs := make([]string, 0, len(policy.targetPods)) for ip := range policy.targetPods { - currnetPodIps = append(currnetPodIps, ip) + currentPodIPs = append(currentPodIPs, ip) } if policy.policyType == "both" || policy.policyType == "ingress" { // create a ipset for all destination pod ip's matched by the policy spec PodSelector targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name) - setEntries := make([][]string, 0) - for _, podIP := range currnetPodIps { - setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"}) - } - npc.ipSetHandler.RefreshSet(targetDestPodIPSetName, setEntries, utils.TypeHashIP) + npc.createGenericHashIPSet(targetDestPodIPSetName, utils.TypeHashIP, currentPodIPs) err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version) if err != nil { return nil, nil, err @@ -128,11 +124,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo if policy.policyType == "both" || policy.policyType == "egress" { // create a ipset for all source pod ip's matched by the policy spec PodSelector targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name) - setEntries := make([][]string, 0) - for _, podIP := range currnetPodIps { - setEntries = append(setEntries, []string{podIP, utils.OptionTimeout, "0"}) - } - npc.ipSetHandler.RefreshSet(targetSourcePodIPSetName, setEntries, utils.TypeHashIP) + npc.createGenericHashIPSet(targetSourcePodIPSetName, utils.TypeHashIP, currentPodIPs) err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version) if err != nil { return nil, nil, err @@ -151,6 +143,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo return activePolicyChains, activePolicyIPSets, nil } +//nolint:dupl // This is as simple as this function gets even though it repeats some of processEgressRules func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo, targetDestPodIPSetName string, activePolicyIPSets map[string]bool, version string) error { @@ -164,47 +157,41 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo // run through all the ingress rules in the spec and create iptables rules // in the chain for the network policy - for i, ingressRule := range policy.ingressRules { + for ruleIdx, ingressRule := range policy.ingressRules { if len(ingressRule.srcPods) != 0 { - srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i) - activePolicyIPSets[srcPodIPSetName] = true - setEntries := make([][]string, 0) - for _, pod := range ingressRule.srcPods { - setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"}) - } - npc.ipSetHandler.RefreshSet(srcPodIPSetName, setEntries, utils.TypeHashIP) + srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, ruleIdx) + + // Create policy based ipset with source pod IPs + npc.createPolicyIndexedIPSet(activePolicyIPSets, srcPodIPSetName, utils.TypeHashIP, + getIPsFromPods(ingressRule.srcPods)) + // If the ingress policy contains port declarations, we need to make sure that we match on pod IP and port if len(ingressRule.ports) != 0 { - // case where 'ports' details and 'from' details specified in the ingress rule - // so match on specified source and destination ip's and specified port (if any) and protocol - for _, portProtocol := range ingressRule.ports { - comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + - policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { - return err - } + if err := npc.createPodWithPortPolicyRule(ingressRule.ports, policy, policyChainName, + srcPodIPSetName, targetDestPodIPSetName); err != nil { + return err } } + // If the ingress policy contains named port declarations, we need to make sure that we match on pod IP and + // the resolved port number if len(ingressRule.namedPorts) != 0 { - for j, endPoints := range ingressRule.namedPorts { - namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) - activePolicyIPSets[namedPortIPSetName] = true - setEntries := make([][]string, 0) - for _, ip := range endPoints.ips { - setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) - } - npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP) + for portIdx, eps := range ingressRule.namedPorts { + namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx, + portIdx) + npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips) comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, namedPortIPSetName, + eps.protocol, eps.port, eps.endport); err != nil { return err } } } + // If the ingress policy contains no ports at all create the policy based only on IP if len(ingressRule.ports) == 0 && len(ingressRule.namedPorts) == 0 { // case where no 'ports' details specified in the ingress rule but 'from' details specified // so match on specified source and destination ip with all port and protocol @@ -216,8 +203,8 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo } } - // case where only 'ports' details specified but no 'from' details in the ingress rule - // so match on all sources, with specified port (if any) and protocol + // case where only 'ports' details specified but no 'from' details in the ingress rule so match on all sources, + // with specified port (if any) and protocol if ingressRule.matchAllSource && !ingressRule.matchAllPorts { for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + @@ -227,25 +214,21 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo } } - for j, endPoints := range ingressRule.namedPorts { - namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) - activePolicyIPSets[namedPortIPSetName] = true - setEntries := make([][]string, 0) - for _, ip := range endPoints.ips { - setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) - } - npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP) + for portIdx, eps := range ingressRule.namedPorts { + namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx, + portIdx) + npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips) comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { return err } } } - // case where nether ports nor from details are speified in the ingress rule - // so match on all ports, protocol, source IP's + // case where nether ports nor from details are specified in the ingress rule so match on all ports, protocol, + // source IP's if ingressRule.matchAllSource && ingressRule.matchAllPorts { comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace @@ -255,7 +238,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo } if len(ingressRule.srcIPBlocks) != 0 { - srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i) + srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, ruleIdx) activePolicyIPSets[srcIPBlockIPSetName] = true npc.ipSetHandler.RefreshSet(srcIPBlockIPSetName, ingressRule.srcIPBlocks, utils.TypeHashNet) @@ -268,17 +251,14 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo } } - for j, endPoints := range ingressRule.namedPorts { - namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) - activePolicyIPSets[namedPortIPSetName] = true - setEntries := make([][]string, 0) - for _, ip := range endPoints.ips { - setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) - } - npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashNet) + for portIdx, eps := range ingressRule.namedPorts { + namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx, + portIdx) + npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashNet, eps.ips) + comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { return err } } @@ -296,6 +276,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo return nil } +//nolint:dupl // This is as simple as this function gets even though it repeats some of ProcessIngressRules func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, targetSourcePodIPSetName string, activePolicyIPSets map[string]bool, version string) error { @@ -309,46 +290,40 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, // run through all the egress rules in the spec and create iptables rules // in the chain for the network policy - for i, egressRule := range policy.egressRules { + for ruleIdx, egressRule := range policy.egressRules { if len(egressRule.dstPods) != 0 { - dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i) - activePolicyIPSets[dstPodIPSetName] = true - setEntries := make([][]string, 0) - for _, pod := range egressRule.dstPods { - setEntries = append(setEntries, []string{pod.ip, utils.OptionTimeout, "0"}) - } - npc.ipSetHandler.RefreshSet(dstPodIPSetName, setEntries, utils.TypeHashIP) + dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, ruleIdx) + + // Create policy based ipset with destination pod IPs + npc.createPolicyIndexedIPSet(activePolicyIPSets, dstPodIPSetName, utils.TypeHashIP, + getIPsFromPods(egressRule.dstPods)) + + // If the egress policy contains port declarations, we need to make sure that we match on pod IP and port if len(egressRule.ports) != 0 { - // case where 'ports' details and 'from' details specified in the egress rule - // so match on specified source and destination ip's and specified port (if any) and protocol - for _, portProtocol := range egressRule.ports { - comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + - policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { - return err - } + if err := npc.createPodWithPortPolicyRule(egressRule.ports, policy, policyChainName, + targetSourcePodIPSetName, dstPodIPSetName); err != nil { + return err } } + // If the egress policy contains named port declarations, we need to make sure that we match on pod IP and + // the resolved port number if len(egressRule.namedPorts) != 0 { - for j, endPoints := range egressRule.namedPorts { - namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j) - activePolicyIPSets[namedPortIPSetName] = true - setEntries := make([][]string, 0) - for _, ip := range endPoints.ips { - setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) - } - npc.ipSetHandler.RefreshSet(namedPortIPSetName, setEntries, utils.TypeHashIP) + for portIdx, eps := range egressRule.namedPorts { + namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, ruleIdx, + portIdx) + npc.createPolicyIndexedIPSet(activePolicyIPSets, namedPortIPSetName, utils.TypeHashIP, eps.ips) + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port, endPoints.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { return err } } - } + // If the egress policy contains no ports at all create the policy based only on IP if len(egressRule.ports) == 0 && len(egressRule.namedPorts) == 0 { // case where no 'ports' details specified in the ingress rule but 'from' details specified // so match on specified source and destination ip with all port and protocol @@ -360,8 +335,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, } } - // case where only 'ports' details specified but no 'to' details in the egress rule - // so match on all sources, with specified port (if any) and protocol + // case where only 'ports' details specified but no 'to' details in the egress rule so match on all sources, + // with specified port (if any) and protocol if egressRule.matchAllDestinations && !egressRule.matchAllPorts { for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + @@ -379,8 +354,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, } } - // case where nether ports nor from details are speified in the egress rule - // so match on all ports, protocol, source IP's + // case where neither ports nor from details are specified in the egress rule so match on all ports, protocol, + // source IP's if egressRule.matchAllDestinations && egressRule.matchAllPorts { comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace @@ -388,8 +363,9 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, return err } } + if len(egressRule.dstIPBlocks) != 0 { - dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i) + dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, ruleIdx) activePolicyIPSets[dstIPBlockIPSetName] = true npc.ipSetHandler.RefreshSet(dstIPBlockIPSetName, egressRule.dstIPBlocks, utils.TypeHashNet) if !egressRule.matchAllPorts { diff --git a/pkg/controllers/netpol/utils.go b/pkg/controllers/netpol/utils.go index 1e685857f..ac1487cf4 100644 --- a/pkg/controllers/netpol/utils.go +++ b/pkg/controllers/netpol/utils.go @@ -6,6 +6,7 @@ import ( "regexp" "strconv" + "github.com/cloudnativelabs/kube-router/pkg/utils" api "k8s.io/api/core/v1" ) @@ -61,3 +62,41 @@ func validateNodePortRange(nodePortOption string) (string, error) { } return fmt.Sprintf("%d:%d", port1, port2), nil } + +func getIPsFromPods(pods []podInfo) []string { + ips := make([]string, len(pods)) + for idx, pod := range pods { + ips[idx] = pod.ip + } + return ips +} + +func (npc *NetworkPolicyController) createGenericHashIPSet(ipsetName, hashType string, ips []string) { + setEntries := make([][]string, 0) + for _, ip := range ips { + setEntries = append(setEntries, []string{ip, utils.OptionTimeout, "0"}) + } + npc.ipSetHandler.RefreshSet(ipsetName, setEntries, hashType) +} + +// createPolicyIndexedIPSet creates a policy based ipset and indexes it as an active ipset +func (npc *NetworkPolicyController) createPolicyIndexedIPSet( + activePolicyIPSets map[string]bool, ipsetName, hashType string, ips []string) { + activePolicyIPSets[ipsetName] = true + npc.createGenericHashIPSet(ipsetName, hashType, ips) +} + +// createPodWithPortPolicyRule handles the case where port details are provided by the ingress/egress rule and creates +// an iptables rule that matches on both the source/dest IPs and the port +func (npc *NetworkPolicyController) createPodWithPortPolicyRule( + ports []protocolAndPort, policy networkPolicyInfo, policyName string, srcSetName string, dstSetName string) error { + for _, portProtocol := range ports { + comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + + policy.name + " namespace " + policy.namespace + if err := npc.appendRuleToPolicyChain(policyName, comment, srcSetName, dstSetName, portProtocol.protocol, + portProtocol.port, portProtocol.endport); err != nil { + return err + } + } + return nil +} diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index 2b9686564..37ce2583e 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -86,9 +86,11 @@ type ipvsCalls interface { type netlinkCalls interface { ipAddrAdd(iface netlink.Link, ip string, addRoute bool) error ipAddrDel(iface netlink.Link, ip string) error - prepareEndpointForDsr(containerID string, endpointIP string, vip string) error + prepareEndpointForDsrWithDocker(containerID string, endpointIP string, vip string) error getKubeDummyInterface() (netlink.Link, error) setupRoutesForExternalIPForDSR(serviceInfoMap) error + prepareEndpointForDsrWithCRI(runtimeEndpoint, containerID, endpointIP, vip string) error + configureContainerForDSR(vip, endpointIP, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error setupPolicyRoutingForDSR() error cleanupMangleTableRule(ip string, protocol string, port string, fwmark string, tcpMSS int) error } @@ -1018,20 +1020,6 @@ func (nsc *NetworkServicesController) getPodObjectForEndpoint(endpointIP string) return nil, errors.New("Failed to find pod with ip " + endpointIP) } -func attemptNamespaceResetAfterError(hostNSHandle netns.NsHandle) { - err := netns.Set(hostNSHandle) - if err != nil { - klog.Errorf("failed to set hostNetworkNamespace while resetting namespace after a previous error due to " + err.Error()) - } - activeNetworkNamespaceHandle, err := netns.Get() - if err != nil { - klog.Errorf("failed to confirm activeNetworkNamespace while resetting namespace after a previous error due to " + err.Error()) - return - } - klog.V(2).Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String()) - _ = activeNetworkNamespaceHandle.Close() -} - // This function does the following // - get the pod corresponding to the endpoint ip // - get the container id from pod spec @@ -1040,9 +1028,9 @@ func attemptNamespaceResetAfterError(hostNSHandle netns.NsHandle) { // - add VIP to the tunnel interface // - disable rp_filter // WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet. -func (ln *linuxNetworking) prepareEndpointForDsr(containerID string, endpointIP string, vip string) error { +func (ln *linuxNetworking) prepareEndpointForDsrWithDocker(containerID string, endpointIP string, vip string) error { - // FIXME: its possible switch namespaces may never work safely in GO without hacks. + // Its possible switch namespaces may never work safely in GO without hacks. // https://groups.google.com/forum/#!topic/golang-nuts/ss1gEOcehjk/discussion // https://www.weave.works/blog/linux-namespaces-and-go-don-t-mix // Dont know if same issue, but seen namespace issue, so adding @@ -1055,150 +1043,43 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerID string, endpointIP hostNetworkNamespaceHandle, err := netns.Get() if err != nil { - return errors.New("Failed to get namespace due to " + err.Error()) + return fmt.Errorf("failed to get namespace due to %v", err) } defer utils.CloseCloserDisregardError(&hostNetworkNamespaceHandle) activeNetworkNamespaceHandle, err = netns.Get() if err != nil { - return errors.New("Failed to get namespace due to " + err.Error()) + return fmt.Errorf("failed to get namespace due to %v", err) } - klog.V(1).Infof("Current network namespace before netns.Set: " + activeNetworkNamespaceHandle.String()) + klog.V(1).Infof("Current network namespace before netns.Set: %s", activeNetworkNamespaceHandle.String()) defer utils.CloseCloserDisregardError(&activeNetworkNamespaceHandle) dockerClient, err := client.NewClientWithOpts(client.FromEnv) if err != nil { - return errors.New("Failed to get docker client due to " + err.Error()) + return fmt.Errorf("failed to get docker client due to %v", err) } defer utils.CloseCloserDisregardError(dockerClient) containerSpec, err := dockerClient.ContainerInspect(context.Background(), containerID) if err != nil { - return errors.New("Failed to get docker container spec due to " + err.Error()) + return fmt.Errorf("failed to get docker container spec due to %v", err) } pid := containerSpec.State.Pid - endpointNamespaceHandle, err := netns.GetFromPid(pid) - if err != nil { - return errors.New("Failed to get endpoint namespace due to " + err.Error()) - } - defer utils.CloseCloserDisregardError(&endpointNamespaceHandle) - - err = netns.Set(endpointNamespaceHandle) - if err != nil { - return errors.New("Failed to enter to endpoint namespace due to " + err.Error()) - } - - activeNetworkNamespaceHandle, err = netns.Get() - if err != nil { - return errors.New("Failed to get activeNetworkNamespace due to " + err.Error()) - } - klog.V(2).Infof("Current network namespace after netns. Set to container network namespace: " + activeNetworkNamespaceHandle.String()) - _ = activeNetworkNamespaceHandle.Close() - - // create a ipip tunnel interface inside the endpoint container - tunIf, err := netlink.LinkByName(KubeTunnelIf) - if err != nil { - if err.Error() != IfaceNotFound { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to verify if ipip tunnel interface exists in endpoint " + endpointIP + " namespace due to " + err.Error()) - } - - klog.V(2).Infof("Could not find tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + " so creating one.") - ipTunLink := netlink.Iptun{ - LinkAttrs: netlink.LinkAttrs{Name: KubeTunnelIf}, - Local: net.ParseIP(endpointIP), - } - err = netlink.LinkAdd(&ipTunLink) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to add ipip tunnel interface in endpoint namespace due to " + err.Error()) - } - - // TODO: this is ugly, but ran into issue multiple times where interface did not come up quickly. - // need to find the root cause - for retry := 0; retry < 60; retry++ { - time.Sleep(100 * time.Millisecond) - tunIf, err = netlink.LinkByName(KubeTunnelIf) - if err == nil { - break - } - if err.Error() == IfaceNotFound { - klog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KubeTunnelIf) - continue - } else { - break - } - } - - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("Failed to get " + KubeTunnelIf + " tunnel interface handle due to " + err.Error()) - } - - klog.V(2).Infof("Successfully created tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + ".") - } - - // bring the tunnel interface up - err = netlink.LinkSetUp(tunIf) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to bring up ipip tunnel interface in endpoint namespace due to " + err.Error()) - } - - // assign VIP to the KUBE_TUNNEL_IF interface - err = ln.ipAddrAdd(tunIf, vip, false) - if err != nil && err.Error() != IfaceHasAddr { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to assign vip " + vip + " to kube-tunnel-if interface ") - } - klog.Infof("Successfully assigned VIP: " + vip + " in endpoint " + endpointIP + ".") - - // disable rp_filter on all interface - err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/kube-tunnel-if/rp_filter", []byte(strconv.Itoa(0)), 0640) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to disable rp_filter on kube-tunnel-if in the endpoint container") - } - - err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/eth0/rp_filter", []byte(strconv.Itoa(0)), 0640) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to disable rp_filter on eth0 in the endpoint container") - } - - err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/all/rp_filter", []byte(strconv.Itoa(0)), 0640) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to disable rp_filter on `all` in the endpoint container") - } - - klog.Infof("Successfully disabled rp_filter in endpoint " + endpointIP + ".") - - err = netns.Set(hostNetworkNamespaceHandle) - if err != nil { - return errors.New("Failed to set hostNetworkNamespace handle due to " + err.Error()) - } - activeNetworkNamespaceHandle, err = netns.Get() - if err != nil { - return errors.New("Failed to get activeNetworkNamespace handle due to " + err.Error()) - } - klog.Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String()) - _ = activeNetworkNamespaceHandle.Close() - return nil + return ln.configureContainerForDSR(vip, endpointIP, containerID, pid, hostNetworkNamespaceHandle) } // The same as prepareEndpointForDsr but using CRI instead of docker. -func (ln *linuxNetworking) prepareEndpointForDsrWithCRI(runtimeEndpoint, containerID, endpointIP, vip string) (err error) { +func (ln *linuxNetworking) prepareEndpointForDsrWithCRI(runtimeEndpoint, containerID, endpointIP, vip string) error { - // FIXME: its possible switch namespaces may never work safely in GO without hacks. + // Its possible switch namespaces may never work safely in GO without hacks. // https://groups.google.com/forum/#!topic/golang-nuts/ss1gEOcehjk/discussion // https://www.weave.works/blog/linux-namespaces-and-go-don-t-mix // Dont know if same issue, but seen namespace issue, so adding // logs and boilerplate code and verbose logs for diagnosis if runtimeEndpoint == "" { - return errors.New("runtimeEndpoint is not specified") + return fmt.Errorf("runtimeEndpoint is not specified") } runtime.LockOSThread() @@ -1206,18 +1087,16 @@ func (ln *linuxNetworking) prepareEndpointForDsrWithCRI(runtimeEndpoint, contain hostNetworkNamespaceHandle, err := netns.Get() if err != nil { - return errors.New("failed to get host namespace due to " + err.Error()) + return fmt.Errorf("failed to get host namespace due to %v", err) } - klog.V(1).Infof("current network namespace before netns.Set: " + hostNetworkNamespaceHandle.String()) + klog.V(1).Infof("current network namespace before netns.Set: %s", hostNetworkNamespaceHandle.String()) defer utils.CloseCloserDisregardError(&hostNetworkNamespaceHandle) rs, err := cri.NewRemoteRuntimeService(runtimeEndpoint, cri.DefaultConnectionTimeout) if err != nil { return err } - defer func() { - err = rs.Close() - }() + defer utils.CloseCloserDisregardError(rs) info, err := rs.ContainerInfo(containerID) if err != nil { @@ -1225,117 +1104,7 @@ func (ln *linuxNetworking) prepareEndpointForDsrWithCRI(runtimeEndpoint, contain } pid := info.Pid - endpointNamespaceHandle, err := netns.GetFromPid(pid) - if err != nil { - return fmt.Errorf("failed to get endpoint namespace (containerID=%s, pid=%d, error=%s)", containerID, pid, err) - } - defer utils.CloseCloserDisregardError(&endpointNamespaceHandle) - - err = netns.Set(endpointNamespaceHandle) - if err != nil { - return fmt.Errorf("failed to enter endpoint namespace (containerID=%s, pid=%d, error=%s)", containerID, pid, err) - } - - activeNetworkNamespaceHandle, err := netns.Get() - if err != nil { - return errors.New("failed to get activeNetworkNamespace due to " + err.Error()) - } - klog.V(2).Infof("Current network namespace after netns. Set to container network namespace: " + activeNetworkNamespaceHandle.String()) - _ = activeNetworkNamespaceHandle.Close() - - // TODO: fix boilerplate `netns.Set(hostNetworkNamespaceHandle)` code. Need a robust - // way to switch back to old namespace, pretty much all things will go wrong if we dont switch back - - // create a ipip tunnel interface inside the endpoint container - tunIf, err := netlink.LinkByName(KubeTunnelIf) - if err != nil { - if err.Error() != IfaceNotFound { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to verify if ipip tunnel interface exists in endpoint " + endpointIP + " namespace due to " + err.Error()) - } - - klog.V(2).Infof("Could not find tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + " so creating one.") - ipTunLink := netlink.Iptun{ - LinkAttrs: netlink.LinkAttrs{Name: KubeTunnelIf}, - Local: net.ParseIP(endpointIP), - } - err = netlink.LinkAdd(&ipTunLink) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to add ipip tunnel interface in endpoint namespace due to " + err.Error()) - } - - // TODO: this is ugly, but ran into issue multiple times where interface did not come up quickly. - // need to find the root cause - for retry := 0; retry < 60; retry++ { - time.Sleep(100 * time.Millisecond) - tunIf, err = netlink.LinkByName(KubeTunnelIf) - if err == nil { - break - } - if err.Error() == IfaceNotFound { - klog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KubeTunnelIf) - continue - } else { - break - } - } - - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to get " + KubeTunnelIf + " tunnel interface handle due to " + err.Error()) - } - - klog.V(2).Infof("Successfully created tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + ".") - } - - // bring the tunnel interface up - err = netlink.LinkSetUp(tunIf) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to bring up ipip tunnel interface in endpoint namespace due to " + err.Error()) - } - - // assign VIP to the KUBE_TUNNEL_IF interface - err = ln.ipAddrAdd(tunIf, vip, false) - if err != nil && err.Error() != IfaceHasAddr { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to assign vip " + vip + " to kube-tunnel-if interface ") - } - klog.Infof("Successfully assigned VIP: " + vip + " in endpoint " + endpointIP + ".") - - // disable rp_filter on all interface - err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/kube-tunnel-if/rp_filter", []byte(strconv.Itoa(0)), 0640) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to disable rp_filter on kube-tunnel-if in the endpoint container") - } - - err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/eth0/rp_filter", []byte(strconv.Itoa(0)), 0640) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to disable rp_filter on eth0 in the endpoint container") - } - - err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/all/rp_filter", []byte(strconv.Itoa(0)), 0640) - if err != nil { - attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) - return errors.New("failed to disable rp_filter on `all` in the endpoint container") - } - - klog.Infof("Successfully disabled rp_filter in endpoint " + endpointIP + ".") - - err = netns.Set(hostNetworkNamespaceHandle) - if err != nil { - return errors.New("Failed to set hostNetworkNamespace handle due to " + err.Error()) - } - activeNetworkNamespaceHandle, err = netns.Get() - if err != nil { - return errors.New("Failed to get activeNetworkNamespace handle due to " + err.Error()) - } - klog.Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String()) - _ = activeNetworkNamespaceHandle.Close() - return nil + return ln.configureContainerForDSR(vip, endpointIP, containerID, pid, hostNetworkNamespaceHandle) } func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { diff --git a/pkg/controllers/proxy/network_services_controller_moq.go b/pkg/controllers/proxy/network_services_controller_moq.go index 6b3c3baac..b4b37c12a 100644 --- a/pkg/controllers/proxy/network_services_controller_moq.go +++ b/pkg/controllers/proxy/network_services_controller_moq.go @@ -6,6 +6,7 @@ package proxy import ( "github.com/moby/ipvs" "github.com/vishvananda/netlink" + "github.com/vishvananda/netns" "net" "sync" ) @@ -20,9 +21,12 @@ var _ LinuxNetworking = &LinuxNetworkingMock{} // // // make and configure a mocked LinuxNetworking // mockedLinuxNetworking := &LinuxNetworkingMock{ -// cleanupMangleTableRuleFunc: func(ip string, protocol string, port string, fwmark string) error { +// cleanupMangleTableRuleFunc: func(ip string, protocol string, port string, fwmark string, tcpMSS int) error { // panic("mock out the cleanupMangleTableRule method") // }, +// configureContainerForDSRFunc: func(vip string, endpointIP string, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error { +// panic("mock out the configureContainerForDSR method") +// }, // getKubeDummyInterfaceFunc: func() (netlink.Link, error) { // panic("mock out the getKubeDummyInterface method") // }, @@ -65,8 +69,11 @@ var _ LinuxNetworking = &LinuxNetworkingMock{} // ipvsUpdateServiceFunc: func(ipvsSvc *ipvs.Service) error { // panic("mock out the ipvsUpdateService method") // }, -// prepareEndpointForDsrFunc: func(containerID string, endpointIP string, vip string) error { -// panic("mock out the prepareEndpointForDsr method") +// prepareEndpointForDsrWithCRIFunc: func(runtimeEndpoint string, containerID string, endpointIP string, vip string) error { +// panic("mock out the prepareEndpointForDsrWithCRI method") +// }, +// prepareEndpointForDsrWithDockerFunc: func(containerID string, endpointIP string, vip string) error { +// panic("mock out the prepareEndpointForDsrWithDocker method") // }, // setupPolicyRoutingForDSRFunc: func() error { // panic("mock out the setupPolicyRoutingForDSR method") @@ -84,6 +91,9 @@ type LinuxNetworkingMock struct { // cleanupMangleTableRuleFunc mocks the cleanupMangleTableRule method. cleanupMangleTableRuleFunc func(ip string, protocol string, port string, fwmark string, tcpMSS int) error + // configureContainerForDSRFunc mocks the configureContainerForDSR method. + configureContainerForDSRFunc func(vip string, endpointIP string, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error + // getKubeDummyInterfaceFunc mocks the getKubeDummyInterface method. getKubeDummyInterfaceFunc func() (netlink.Link, error) @@ -126,8 +136,11 @@ type LinuxNetworkingMock struct { // ipvsUpdateServiceFunc mocks the ipvsUpdateService method. ipvsUpdateServiceFunc func(ipvsSvc *ipvs.Service) error - // prepareEndpointForDsrFunc mocks the prepareEndpointForDsr method. - prepareEndpointForDsrFunc func(containerID string, endpointIP string, vip string) error + // prepareEndpointForDsrWithCRIFunc mocks the prepareEndpointForDsrWithCRI method. + prepareEndpointForDsrWithCRIFunc func(runtimeEndpoint string, containerID string, endpointIP string, vip string) error + + // prepareEndpointForDsrWithDockerFunc mocks the prepareEndpointForDsrWithDocker method. + prepareEndpointForDsrWithDockerFunc func(containerID string, endpointIP string, vip string) error // setupPolicyRoutingForDSRFunc mocks the setupPolicyRoutingForDSR method. setupPolicyRoutingForDSRFunc func() error @@ -147,6 +160,21 @@ type LinuxNetworkingMock struct { Port string // Fwmark is the fwmark argument value. Fwmark string + // TcpMSS is the tcpMSS argument value. + TcpMSS int + } + // configureContainerForDSR holds details about calls to the configureContainerForDSR method. + configureContainerForDSR []struct { + // Vip is the vip argument value. + Vip string + // EndpointIP is the endpointIP argument value. + EndpointIP string + // ContainerID is the containerID argument value. + ContainerID string + // Pid is the pid argument value. + Pid int + // HostNetworkNamespaceHandle is the hostNetworkNamespaceHandle argument value. + HostNetworkNamespaceHandle netns.NsHandle } // getKubeDummyInterface holds details about calls to the getKubeDummyInterface method. getKubeDummyInterface []struct { @@ -254,8 +282,19 @@ type LinuxNetworkingMock struct { // IpvsSvc is the ipvsSvc argument value. IpvsSvc *ipvs.Service } - // prepareEndpointForDsr holds details about calls to the prepareEndpointForDsr method. - prepareEndpointForDsr []struct { + // prepareEndpointForDsrWithCRI holds details about calls to the prepareEndpointForDsrWithCRI method. + prepareEndpointForDsrWithCRI []struct { + // RuntimeEndpoint is the runtimeEndpoint argument value. + RuntimeEndpoint string + // ContainerID is the containerID argument value. + ContainerID string + // EndpointIP is the endpointIP argument value. + EndpointIP string + // Vip is the vip argument value. + Vip string + } + // prepareEndpointForDsrWithDocker holds details about calls to the prepareEndpointForDsrWithDocker method. + prepareEndpointForDsrWithDocker []struct { // ContainerID is the containerID argument value. ContainerID string // EndpointIP is the endpointIP argument value. @@ -272,24 +311,26 @@ type LinuxNetworkingMock struct { ServiceInfoMapMoqParam serviceInfoMap } } - lockcleanupMangleTableRule sync.RWMutex - lockgetKubeDummyInterface sync.RWMutex - lockipAddrAdd sync.RWMutex - lockipAddrDel sync.RWMutex - lockipvsAddFWMarkService sync.RWMutex - lockipvsAddServer sync.RWMutex - lockipvsAddService sync.RWMutex - lockipvsDelDestination sync.RWMutex - lockipvsDelService sync.RWMutex - lockipvsGetDestinations sync.RWMutex - lockipvsGetServices sync.RWMutex - lockipvsNewDestination sync.RWMutex - lockipvsNewService sync.RWMutex - lockipvsUpdateDestination sync.RWMutex - lockipvsUpdateService sync.RWMutex - lockprepareEndpointForDsr sync.RWMutex - locksetupPolicyRoutingForDSR sync.RWMutex - locksetupRoutesForExternalIPForDSR sync.RWMutex + lockcleanupMangleTableRule sync.RWMutex + lockconfigureContainerForDSR sync.RWMutex + lockgetKubeDummyInterface sync.RWMutex + lockipAddrAdd sync.RWMutex + lockipAddrDel sync.RWMutex + lockipvsAddFWMarkService sync.RWMutex + lockipvsAddServer sync.RWMutex + lockipvsAddService sync.RWMutex + lockipvsDelDestination sync.RWMutex + lockipvsDelService sync.RWMutex + lockipvsGetDestinations sync.RWMutex + lockipvsGetServices sync.RWMutex + lockipvsNewDestination sync.RWMutex + lockipvsNewService sync.RWMutex + lockipvsUpdateDestination sync.RWMutex + lockipvsUpdateService sync.RWMutex + lockprepareEndpointForDsrWithCRI sync.RWMutex + lockprepareEndpointForDsrWithDocker sync.RWMutex + locksetupPolicyRoutingForDSR sync.RWMutex + locksetupRoutesForExternalIPForDSR sync.RWMutex } // cleanupMangleTableRule calls cleanupMangleTableRuleFunc. @@ -302,11 +343,13 @@ func (mock *LinuxNetworkingMock) cleanupMangleTableRule(ip string, protocol stri Protocol string Port string Fwmark string + TcpMSS int }{ IP: ip, Protocol: protocol, Port: port, Fwmark: fwmark, + TcpMSS: tcpMSS, } mock.lockcleanupMangleTableRule.Lock() mock.calls.cleanupMangleTableRule = append(mock.calls.cleanupMangleTableRule, callInfo) @@ -322,12 +365,14 @@ func (mock *LinuxNetworkingMock) cleanupMangleTableRuleCalls() []struct { Protocol string Port string Fwmark string + TcpMSS int } { var calls []struct { IP string Protocol string Port string Fwmark string + TcpMSS int } mock.lockcleanupMangleTableRule.RLock() calls = mock.calls.cleanupMangleTableRule @@ -335,6 +380,53 @@ func (mock *LinuxNetworkingMock) cleanupMangleTableRuleCalls() []struct { return calls } +// configureContainerForDSR calls configureContainerForDSRFunc. +func (mock *LinuxNetworkingMock) configureContainerForDSR(vip string, endpointIP string, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error { + if mock.configureContainerForDSRFunc == nil { + panic("LinuxNetworkingMock.configureContainerForDSRFunc: method is nil but LinuxNetworking.configureContainerForDSR was just called") + } + callInfo := struct { + Vip string + EndpointIP string + ContainerID string + Pid int + HostNetworkNamespaceHandle netns.NsHandle + }{ + Vip: vip, + EndpointIP: endpointIP, + ContainerID: containerID, + Pid: pid, + HostNetworkNamespaceHandle: hostNetworkNamespaceHandle, + } + mock.lockconfigureContainerForDSR.Lock() + mock.calls.configureContainerForDSR = append(mock.calls.configureContainerForDSR, callInfo) + mock.lockconfigureContainerForDSR.Unlock() + return mock.configureContainerForDSRFunc(vip, endpointIP, containerID, pid, hostNetworkNamespaceHandle) +} + +// configureContainerForDSRCalls gets all the calls that were made to configureContainerForDSR. +// Check the length with: +// len(mockedLinuxNetworking.configureContainerForDSRCalls()) +func (mock *LinuxNetworkingMock) configureContainerForDSRCalls() []struct { + Vip string + EndpointIP string + ContainerID string + Pid int + HostNetworkNamespaceHandle netns.NsHandle +} { + var calls []struct { + Vip string + EndpointIP string + ContainerID string + Pid int + HostNetworkNamespaceHandle netns.NsHandle + } + mock.lockconfigureContainerForDSR.RLock() + calls = mock.calls.configureContainerForDSR + mock.lockconfigureContainerForDSR.RUnlock() + return calls +} + // getKubeDummyInterface calls getKubeDummyInterfaceFunc. func (mock *LinuxNetworkingMock) getKubeDummyInterface() (netlink.Link, error) { if mock.getKubeDummyInterfaceFunc == nil { @@ -839,10 +931,53 @@ func (mock *LinuxNetworkingMock) ipvsUpdateServiceCalls() []struct { return calls } -// prepareEndpointForDsr calls prepareEndpointForDsrFunc. -func (mock *LinuxNetworkingMock) prepareEndpointForDsr(containerID string, endpointIP string, vip string) error { - if mock.prepareEndpointForDsrFunc == nil { - panic("LinuxNetworkingMock.prepareEndpointForDsrFunc: method is nil but LinuxNetworking.prepareEndpointForDsr was just called") +// prepareEndpointForDsrWithCRI calls prepareEndpointForDsrWithCRIFunc. +func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithCRI(runtimeEndpoint string, containerID string, endpointIP string, vip string) error { + if mock.prepareEndpointForDsrWithCRIFunc == nil { + panic("LinuxNetworkingMock.prepareEndpointForDsrWithCRIFunc: method is nil but LinuxNetworking.prepareEndpointForDsrWithCRI was just called") + } + callInfo := struct { + RuntimeEndpoint string + ContainerID string + EndpointIP string + Vip string + }{ + RuntimeEndpoint: runtimeEndpoint, + ContainerID: containerID, + EndpointIP: endpointIP, + Vip: vip, + } + mock.lockprepareEndpointForDsrWithCRI.Lock() + mock.calls.prepareEndpointForDsrWithCRI = append(mock.calls.prepareEndpointForDsrWithCRI, callInfo) + mock.lockprepareEndpointForDsrWithCRI.Unlock() + return mock.prepareEndpointForDsrWithCRIFunc(runtimeEndpoint, containerID, endpointIP, vip) +} + +// prepareEndpointForDsrWithCRICalls gets all the calls that were made to prepareEndpointForDsrWithCRI. +// Check the length with: +// len(mockedLinuxNetworking.prepareEndpointForDsrWithCRICalls()) +func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithCRICalls() []struct { + RuntimeEndpoint string + ContainerID string + EndpointIP string + Vip string +} { + var calls []struct { + RuntimeEndpoint string + ContainerID string + EndpointIP string + Vip string + } + mock.lockprepareEndpointForDsrWithCRI.RLock() + calls = mock.calls.prepareEndpointForDsrWithCRI + mock.lockprepareEndpointForDsrWithCRI.RUnlock() + return calls +} + +// prepareEndpointForDsrWithDocker calls prepareEndpointForDsrWithDockerFunc. +func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithDocker(containerID string, endpointIP string, vip string) error { + if mock.prepareEndpointForDsrWithDockerFunc == nil { + panic("LinuxNetworkingMock.prepareEndpointForDsrWithDockerFunc: method is nil but LinuxNetworking.prepareEndpointForDsrWithDocker was just called") } callInfo := struct { ContainerID string @@ -853,16 +988,16 @@ func (mock *LinuxNetworkingMock) prepareEndpointForDsr(containerID string, endpo EndpointIP: endpointIP, Vip: vip, } - mock.lockprepareEndpointForDsr.Lock() - mock.calls.prepareEndpointForDsr = append(mock.calls.prepareEndpointForDsr, callInfo) - mock.lockprepareEndpointForDsr.Unlock() - return mock.prepareEndpointForDsrFunc(containerID, endpointIP, vip) + mock.lockprepareEndpointForDsrWithDocker.Lock() + mock.calls.prepareEndpointForDsrWithDocker = append(mock.calls.prepareEndpointForDsrWithDocker, callInfo) + mock.lockprepareEndpointForDsrWithDocker.Unlock() + return mock.prepareEndpointForDsrWithDockerFunc(containerID, endpointIP, vip) } -// prepareEndpointForDsrCalls gets all the calls that were made to prepareEndpointForDsr. +// prepareEndpointForDsrWithDockerCalls gets all the calls that were made to prepareEndpointForDsrWithDocker. // Check the length with: -// len(mockedLinuxNetworking.prepareEndpointForDsrCalls()) -func (mock *LinuxNetworkingMock) prepareEndpointForDsrCalls() []struct { +// len(mockedLinuxNetworking.prepareEndpointForDsrWithDockerCalls()) +func (mock *LinuxNetworkingMock) prepareEndpointForDsrWithDockerCalls() []struct { ContainerID string EndpointIP string Vip string @@ -872,9 +1007,9 @@ func (mock *LinuxNetworkingMock) prepareEndpointForDsrCalls() []struct { EndpointIP string Vip string } - mock.lockprepareEndpointForDsr.RLock() - calls = mock.calls.prepareEndpointForDsr - mock.lockprepareEndpointForDsr.RUnlock() + mock.lockprepareEndpointForDsrWithDocker.RLock() + calls = mock.calls.prepareEndpointForDsrWithDocker + mock.lockprepareEndpointForDsrWithDocker.RUnlock() return calls } diff --git a/pkg/controllers/proxy/service_endpoints_sync.go b/pkg/controllers/proxy/service_endpoints_sync.go index bdc318958..df626067d 100644 --- a/pkg/controllers/proxy/service_endpoints_sync.go +++ b/pkg/controllers/proxy/service_endpoints_sync.go @@ -427,7 +427,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser if runtime == "docker" { // WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet. - err = nsc.ln.prepareEndpointForDsr(containerID, endpoint.ip, externalIPService.externalIP) + err = nsc.ln.prepareEndpointForDsrWithDocker(containerID, endpoint.ip, externalIPService.externalIP) if err != nil { klog.Errorf("Failed to prepare endpoint %s to do direct server return due to %s", endpoint.ip, err.Error()) } diff --git a/pkg/controllers/proxy/utils.go b/pkg/controllers/proxy/utils.go new file mode 100644 index 000000000..abd52ecb1 --- /dev/null +++ b/pkg/controllers/proxy/utils.go @@ -0,0 +1,148 @@ +package proxy + +import ( + "fmt" + "io/ioutil" + "net" + "strconv" + "time" + + "github.com/cloudnativelabs/kube-router/pkg/utils" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netns" + "k8s.io/klog/v2" +) + +func attemptNamespaceResetAfterError(hostNSHandle netns.NsHandle) { + err := netns.Set(hostNSHandle) + if err != nil { + klog.Errorf("failed to set hostNetworkNamespace while resetting namespace after a previous error due to " + err.Error()) + } + activeNetworkNamespaceHandle, err := netns.Get() + if err != nil { + klog.Errorf("failed to confirm activeNetworkNamespace while resetting namespace after a previous error due to " + err.Error()) + return + } + klog.V(2).Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String()) + _ = activeNetworkNamespaceHandle.Close() +} + +func (ln *linuxNetworking) configureContainerForDSR( + vip, endpointIP, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error { + endpointNamespaceHandle, err := netns.GetFromPid(pid) + if err != nil { + return fmt.Errorf("failed to get endpoint namespace (containerID=%s, pid=%d, error=%v)", + containerID, pid, err) + } + defer utils.CloseCloserDisregardError(&endpointNamespaceHandle) + + err = netns.Set(endpointNamespaceHandle) + if err != nil { + return fmt.Errorf("failed to enter endpoint namespace (containerID=%s, pid=%d, error=%v)", + containerID, pid, err) + } + + activeNetworkNamespaceHandle, err := netns.Get() + if err != nil { + return fmt.Errorf("failed to get activeNetworkNamespace due to %v", err) + } + klog.V(2).Infof("Current network namespace after netns. Set to container network namespace: %s", + activeNetworkNamespaceHandle.String()) + _ = activeNetworkNamespaceHandle.Close() + + // TODO: fix boilerplate `netns.Set(hostNetworkNamespaceHandle)` code. Need a robust + // way to switch back to old namespace, pretty much all things will go wrong if we dont switch back + + // create a ipip tunnel interface inside the endpoint container + tunIf, err := netlink.LinkByName(KubeTunnelIf) + if err != nil { + if err.Error() != IfaceNotFound { + attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) + return fmt.Errorf("failed to verify if ipip tunnel interface exists in endpoint %s namespace due "+ + "to %v", endpointIP, err) + } + + klog.V(2).Infof("Could not find tunnel interface %s in endpoint %s so creating one.", + KubeTunnelIf, endpointIP) + ipTunLink := netlink.Iptun{ + LinkAttrs: netlink.LinkAttrs{Name: KubeTunnelIf}, + Local: net.ParseIP(endpointIP), + } + err = netlink.LinkAdd(&ipTunLink) + if err != nil { + attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) + return fmt.Errorf("failed to add ipip tunnel interface in endpoint namespace due to %v", err) + } + + // this is ugly, but ran into issue multiple times where interface did not come up quickly. + for retry := 0; retry < 60; retry++ { + time.Sleep(100 * time.Millisecond) + tunIf, err = netlink.LinkByName(KubeTunnelIf) + if err == nil { + break + } + if err.Error() == IfaceNotFound { + klog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KubeTunnelIf) + continue + } else { + break + } + } + + if err != nil { + attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) + return fmt.Errorf("failed to get %s tunnel interface handle due to %v", KubeTunnelIf, err) + } + + klog.V(2).Infof("Successfully created tunnel interface %s in endpoint %s.", KubeTunnelIf, endpointIP) + } + + // bring the tunnel interface up + err = netlink.LinkSetUp(tunIf) + if err != nil { + attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) + return fmt.Errorf("failed to bring up ipip tunnel interface in endpoint namespace due to %v", err) + } + + // assign VIP to the KUBE_TUNNEL_IF interface + err = ln.ipAddrAdd(tunIf, vip, false) + if err != nil && err.Error() != IfaceHasAddr { + attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) + return fmt.Errorf("failed to assign vip %s to kube-tunnel-if interface", vip) + } + klog.Infof("Successfully assigned VIP: %s in endpoint %s.", vip, endpointIP) + + // disable rp_filter on all interface + err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/kube-tunnel-if/rp_filter", []byte(strconv.Itoa(0)), 0640) + if err != nil { + attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) + return fmt.Errorf("failed to disable rp_filter on kube-tunnel-if in the endpoint container") + } + + err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/eth0/rp_filter", []byte(strconv.Itoa(0)), 0640) + if err != nil { + attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) + return fmt.Errorf("failed to disable rp_filter on eth0 in the endpoint container") + } + + err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/all/rp_filter", []byte(strconv.Itoa(0)), 0640) + if err != nil { + attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) + return fmt.Errorf("failed to disable rp_filter on `all` in the endpoint container") + } + + klog.Infof("Successfully disabled rp_filter in endpoint %s.", endpointIP) + + err = netns.Set(hostNetworkNamespaceHandle) + if err != nil { + return fmt.Errorf("failed to set hostNetworkNamespace handle due to %v", err) + } + activeNetworkNamespaceHandle, err = netns.Get() + if err != nil { + return fmt.Errorf("failed to get activeNetworkNamespace handle due to %v", err) + } + klog.Infof("Current network namespace after revert namespace to host network namespace: %s", + activeNetworkNamespaceHandle.String()) + _ = activeNetworkNamespaceHandle.Close() + return nil +} diff --git a/pkg/controllers/routing/network_routes_controller_test.go b/pkg/controllers/routing/network_routes_controller_test.go index a28e444d3..e8e34f8c2 100644 --- a/pkg/controllers/routing/network_routes_controller_test.go +++ b/pkg/controllers/routing/network_routes_controller_test.go @@ -515,6 +515,7 @@ func Test_advertiseExternalIPs(t *testing.T) { }, } + //nolint:dupl // There is no need to spend a lot of time de-duplicating test code for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { go testcase.nrc.bgpServer.Serve() @@ -704,6 +705,7 @@ func Test_advertiseAnnotationOptOut(t *testing.T) { }, } + //nolint:dupl // There is no need to spend a lot of time de-duplicating test code for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { go testcase.nrc.bgpServer.Serve() @@ -926,6 +928,7 @@ func Test_advertiseAnnotationOptIn(t *testing.T) { }, } + //nolint:dupl // There is no need to spend a lot of time de-duplicating test code for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { go testcase.nrc.bgpServer.Serve()