diff --git a/.golangci.yml b/.golangci.yml index 600bef78e2..bb14e275be 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,2 +1,23 @@ run: timeout: 5m +linters: + disable-all: true + enable: + - deadcode + - errcheck + - gofmt + - goimports + - golint + - gosimple + - govet + - ineffassign + - misspell + - staticcheck + - structcheck + - typecheck + - unused + - varcheck +output: + format: tab + print-issued-lines: true + print-linter-name: true diff --git a/pkg/cmd/kube-router.go b/pkg/cmd/kube-router.go index eba4142fff..bf18b7aa07 100644 --- a/pkg/cmd/kube-router.go +++ b/pkg/cmd/kube-router.go @@ -17,11 +17,12 @@ import ( "github.com/cloudnativelabs/kube-router/pkg/options" "github.com/golang/glog" + "time" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "time" ) // These get set at build time via -ldflags magic diff --git a/pkg/controllers/netpol/namespace.go b/pkg/controllers/netpol/namespace.go index 04020d97d7..52493531f0 100644 --- a/pkg/controllers/netpol/namespace.go +++ b/pkg/controllers/netpol/namespace.go @@ -1,10 +1,11 @@ package netpol import ( + "reflect" + "github.com/golang/glog" api "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" - "reflect" ) func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler { diff --git a/pkg/controllers/netpol/network_policy_controller.go b/pkg/controllers/netpol/network_policy_controller.go index 48a8ba1a09..f1d834cdd1 100644 --- a/pkg/controllers/netpol/network_policy_controller.go +++ b/pkg/controllers/netpol/network_policy_controller.go @@ -5,8 +5,6 @@ import ( "encoding/base32" "errors" "fmt" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" "net" "regexp" "strconv" @@ -14,6 +12,9 @@ import ( "sync" "time" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "github.com/cloudnativelabs/kube-router/pkg/healthcheck" "github.com/cloudnativelabs/kube-router/pkg/metrics" "github.com/cloudnativelabs/kube-router/pkg/options" @@ -34,8 +35,8 @@ import ( const ( kubePodFirewallChainPrefix = "KUBE-POD-FW-" kubeNetworkPolicyChainPrefix = "KUBE-NWPLCY-" - kubeSourceIpSetPrefix = "KUBE-SRC-" - kubeDestinationIpSetPrefix = "KUBE-DST-" + kubeSourceIPSetPrefix = "KUBE-SRC-" + kubeDestinationIPSetPrefix = "KUBE-DST-" kubeInputChainName = "KUBE-ROUTER-INPUT" kubeForwardChainName = "KUBE-ROUTER-FORWARD" kubeOutputChainName = "KUBE-ROUTER-OUTPUT" @@ -103,7 +104,7 @@ type podInfo struct { labels map[string]string } -// internal stucture to represent NetworkPolicyIngressRule in the spec +// internal structure to represent NetworkPolicyIngressRule in the spec type ingressRule struct { matchAllPorts bool ports []protocolAndPort @@ -325,7 +326,7 @@ func (npc *NetworkPolicyController) fullPolicySync() { } } - activePolicyChains, activePolicyIpSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion) + activePolicyChains, activePolicyIPSets, err := npc.syncNetworkPolicyChains(networkPoliciesInfo, syncVersion) if err != nil { glog.Errorf("Aborting sync. Failed to sync network policy chains: %v" + err.Error()) return @@ -337,7 +338,7 @@ func (npc *NetworkPolicyController) fullPolicySync() { return } - err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIpSets) + err = cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets) if err != nil { glog.Errorf("Aborting sync. Failed to cleanup stale iptables rules: %v", err.Error()) return @@ -357,7 +358,7 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo glog.V(2).Infof("Syncing network policy chains took %v", endTime) }() activePolicyChains := make(map[string]bool) - activePolicyIpSets := make(map[string]bool) + activePolicyIPSets := make(map[string]bool) iptablesCmdHandler, err := iptables.New() if err != nil { @@ -383,49 +384,49 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo if policy.policyType == "both" || policy.policyType == "ingress" { // create a ipset for all destination pod ip's matched by the policy spec PodSelector - targetDestPodIpSetName := policyDestinationPodIpSetName(policy.namespace, policy.name) - targetDestPodIpSet, err := npc.ipSetHandler.Create(targetDestPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + targetDestPodIPSetName := policyDestinationPodIPSetName(policy.namespace, policy.name) + targetDestPodIPSet, err := npc.ipSetHandler.Create(targetDestPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") if err != nil { return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) } - err = targetDestPodIpSet.Refresh(currnetPodIps, utils.OptionTimeout, "0") + err = targetDestPodIPSet.Refresh(currnetPodIps, utils.OptionTimeout, "0") if err != nil { - glog.Errorf("failed to refresh targetDestPodIpSet,: " + err.Error()) + glog.Errorf("failed to refresh targetDestPodIPSet,: " + err.Error()) } - err = npc.processIngressRules(policy, targetDestPodIpSetName, activePolicyIpSets, version) + err = npc.processIngressRules(policy, targetDestPodIPSetName, activePolicyIPSets, version) if err != nil { return nil, nil, err } - activePolicyIpSets[targetDestPodIpSet.Name] = true + activePolicyIPSets[targetDestPodIPSet.Name] = true } if policy.policyType == "both" || policy.policyType == "egress" { // create a ipset for all source pod ip's matched by the policy spec PodSelector - targetSourcePodIpSetName := policySourcePodIpSetName(policy.namespace, policy.name) - targetSourcePodIpSet, err := npc.ipSetHandler.Create(targetSourcePodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + targetSourcePodIPSetName := policySourcePodIPSetName(policy.namespace, policy.name) + targetSourcePodIPSet, err := npc.ipSetHandler.Create(targetSourcePodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") if err != nil { return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error()) } - err = targetSourcePodIpSet.Refresh(currnetPodIps, utils.OptionTimeout, "0") + err = targetSourcePodIPSet.Refresh(currnetPodIps, utils.OptionTimeout, "0") if err != nil { - glog.Errorf("failed to refresh targetSourcePodIpSet: " + err.Error()) + glog.Errorf("failed to refresh targetSourcePodIPSet: " + err.Error()) } - err = npc.processEgressRules(policy, targetSourcePodIpSetName, activePolicyIpSets, version) + err = npc.processEgressRules(policy, targetSourcePodIPSetName, activePolicyIPSets, version) if err != nil { return nil, nil, err } - activePolicyIpSets[targetSourcePodIpSet.Name] = true + activePolicyIPSets[targetSourcePodIPSet.Name] = true } } glog.V(2).Infof("Iptables chains in the filter table are synchronized with the network policies.") - return activePolicyChains, activePolicyIpSets, nil + return activePolicyChains, activePolicyIPSets, nil } func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo, - targetDestPodIpSetName string, activePolicyIpSets map[string]bool, version string) error { + targetDestPodIPSetName string, activePolicyIPSets map[string]bool, version string) error { // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " // so no whitelist rules to be added to the network policy @@ -445,21 +446,21 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo for i, ingressRule := range policy.ingressRules { if len(ingressRule.srcPods) != 0 { - srcPodIpSetName := policyIndexedSourcePodIpSetName(policy.namespace, policy.name, i) - srcPodIpSet, err := npc.ipSetHandler.Create(srcPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + srcPodIPSetName := policyIndexedSourcePodIPSetName(policy.namespace, policy.name, i) + srcPodIPSet, err := npc.ipSetHandler.Create(srcPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") if err != nil { return fmt.Errorf("failed to create ipset: %s", err.Error()) } - activePolicyIpSets[srcPodIpSet.Name] = true + activePolicyIPSets[srcPodIPSet.Name] = true - ingressRuleSrcPodIps := make([]string, 0, len(ingressRule.srcPods)) + ingressRuleSrcPodIPs := make([]string, 0, len(ingressRule.srcPods)) for _, pod := range ingressRule.srcPods { - ingressRuleSrcPodIps = append(ingressRuleSrcPodIps, pod.ip) + ingressRuleSrcPodIPs = append(ingressRuleSrcPodIPs, pod.ip) } - err = srcPodIpSet.Refresh(ingressRuleSrcPodIps, utils.OptionTimeout, "0") + err = srcPodIPSet.Refresh(ingressRuleSrcPodIPs, utils.OptionTimeout, "0") if err != nil { - glog.Errorf("failed to refresh srcPodIpSet: " + err.Error()) + glog.Errorf("failed to refresh srcPodIPSet: " + err.Error()) } if len(ingressRule.ports) != 0 { @@ -468,7 +469,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIpSetName, targetDestPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { return err } } @@ -476,19 +477,19 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if len(ingressRule.namedPorts) != 0 { for j, endPoints := range ingressRule.namedPorts { - namedPortIpSetName := policyIndexedIngressNamedPortIpSetName(policy.namespace, policy.name, i, j) - namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) + namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") if err != nil { return fmt.Errorf("failed to create ipset: %s", err.Error()) } - activePolicyIpSets[namedPortIpSet.Name] = true - err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0") + activePolicyIPSets[namedPortIPSet.Name] = true + err = namedPortIPSet.Refresh(endPoints.ips, utils.OptionTimeout, "0") if err != nil { - glog.Errorf("failed to refresh namedPortIpSet: " + err.Error()) + glog.Errorf("failed to refresh namedPortIPSet: " + err.Error()) } comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIpSetName, namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { return err } } @@ -499,7 +500,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo // so match on specified source and destination ip with all port and protocol comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIpSetName, targetDestPodIpSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", ""); err != nil { return err } } @@ -511,27 +512,27 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { return err } } for j, endPoints := range ingressRule.namedPorts { - namedPortIpSetName := policyIndexedIngressNamedPortIpSetName(policy.namespace, policy.name, i, j) - namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) + namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") if err != nil { return fmt.Errorf("failed to create ipset: %s", err.Error()) } - activePolicyIpSets[namedPortIpSet.Name] = true + activePolicyIPSets[namedPortIPSet.Name] = true - err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0") + err = namedPortIPSet.Refresh(endPoints.ips, utils.OptionTimeout, "0") if err != nil { - glog.Errorf("failed to refresh namedPortIpSet: " + err.Error()) + glog.Errorf("failed to refresh namedPortIPSet: " + err.Error()) } comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { return err } } @@ -542,47 +543,47 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if ingressRule.matchAllSource && ingressRule.matchAllPorts { comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIpSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, "", targetDestPodIPSetName, "", ""); err != nil { return err } } if len(ingressRule.srcIPBlocks) != 0 { - srcIpBlockIpSetName := policyIndexedSourceIpBlockIpSetName(policy.namespace, policy.name, i) - srcIpBlockIpSet, err := npc.ipSetHandler.Create(srcIpBlockIpSetName, utils.TypeHashNet, utils.OptionTimeout, "0") + srcIPBlockIPSetName := policyIndexedSourceIPBlockIPSetName(policy.namespace, policy.name, i) + srcIPBlockIPSet, err := npc.ipSetHandler.Create(srcIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0") if err != nil { return fmt.Errorf("failed to create ipset: %s", err.Error()) } - activePolicyIpSets[srcIpBlockIpSet.Name] = true - err = srcIpBlockIpSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks) + activePolicyIPSets[srcIPBlockIPSet.Name] = true + err = srcIPBlockIPSet.RefreshWithBuiltinOptions(ingressRule.srcIPBlocks) if err != nil { - glog.Errorf("failed to refresh srcIpBlockIpSet: " + err.Error()) + glog.Errorf("failed to refresh srcIPBlockIPSet: " + err.Error()) } if !ingressRule.matchAllPorts { for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIpBlockIpSetName, targetDestPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { return err } } for j, endPoints := range ingressRule.namedPorts { - namedPortIpSetName := policyIndexedIngressNamedPortIpSetName(policy.namespace, policy.name, i, j) - namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + namedPortIPSetName := policyIndexedIngressNamedPortIPSetName(policy.namespace, policy.name, i, j) + namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") if err != nil { return fmt.Errorf("failed to create ipset: %s", err.Error()) } - activePolicyIpSets[namedPortIpSet.Name] = true + activePolicyIPSets[namedPortIPSet.Name] = true - err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0") + err = namedPortIPSet.Refresh(endPoints.ips, utils.OptionTimeout, "0") if err != nil { - glog.Errorf("failed to refresh namedPortIpSet: " + err.Error()) + glog.Errorf("failed to refresh namedPortIPSet: " + err.Error()) } comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIpBlockIpSetName, namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { return err } } @@ -590,7 +591,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if ingressRule.matchAllPorts { comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIpBlockIpSetName, targetDestPodIpSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", ""); err != nil { return err } } @@ -601,7 +602,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo } func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, - targetSourcePodIpSetName string, activePolicyIpSets map[string]bool, version string) error { + targetSourcePodIPSetName string, activePolicyIPSets map[string]bool, version string) error { // From network policy spec: "If field 'Ingress' is empty then this NetworkPolicy does not allow any traffic " // so no whitelist rules to be added to the network policy @@ -621,21 +622,21 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, for i, egressRule := range policy.egressRules { if len(egressRule.dstPods) != 0 { - dstPodIpSetName := policyIndexedDestinationPodIpSetName(policy.namespace, policy.name, i) - dstPodIpSet, err := npc.ipSetHandler.Create(dstPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + dstPodIPSetName := policyIndexedDestinationPodIPSetName(policy.namespace, policy.name, i) + dstPodIPSet, err := npc.ipSetHandler.Create(dstPodIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") if err != nil { return fmt.Errorf("failed to create ipset: %s", err.Error()) } - activePolicyIpSets[dstPodIpSet.Name] = true + activePolicyIPSets[dstPodIPSet.Name] = true egressRuleDstPodIps := make([]string, 0, len(egressRule.dstPods)) for _, pod := range egressRule.dstPods { egressRuleDstPodIps = append(egressRuleDstPodIps, pod.ip) } - err = dstPodIpSet.Refresh(egressRuleDstPodIps, utils.OptionTimeout, "0") + err = dstPodIPSet.Refresh(egressRuleDstPodIps, utils.OptionTimeout, "0") if err != nil { - glog.Errorf("failed to refresh dstPodIpSet: " + err.Error()) + glog.Errorf("failed to refresh dstPodIPSet: " + err.Error()) } if len(egressRule.ports) != 0 { // case where 'ports' details and 'from' details specified in the egress rule @@ -643,7 +644,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstPodIpSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, portProtocol.protocol, portProtocol.port); err != nil { return err } } @@ -651,21 +652,21 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if len(egressRule.namedPorts) != 0 { for j, endPoints := range egressRule.namedPorts { - namedPortIpSetName := policyIndexedEgressNamedPortIpSetName(policy.namespace, policy.name, i, j) - namedPortIpSet, err := npc.ipSetHandler.Create(namedPortIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0") + namedPortIPSetName := policyIndexedEgressNamedPortIPSetName(policy.namespace, policy.name, i, j) + namedPortIPSet, err := npc.ipSetHandler.Create(namedPortIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0") if err != nil { return fmt.Errorf("failed to create ipset: %s", err.Error()) } - activePolicyIpSets[namedPortIpSet.Name] = true + activePolicyIPSets[namedPortIPSet.Name] = true - err = namedPortIpSet.Refresh(endPoints.ips, utils.OptionTimeout, "0") + err = namedPortIPSet.Refresh(endPoints.ips, utils.OptionTimeout, "0") if err != nil { - glog.Errorf("failed to refresh namedPortIpSet: " + err.Error()) + glog.Errorf("failed to refresh namedPortIPSet: " + err.Error()) } comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, namedPortIpSetName, endPoints.protocol, endPoints.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, endPoints.protocol, endPoints.port); err != nil { return err } } @@ -677,7 +678,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, // so match on specified source and destination ip with all port and protocol comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstPodIpSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", ""); err != nil { return err } } @@ -689,7 +690,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, "", portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port); err != nil { return err } } @@ -700,26 +701,26 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if egressRule.matchAllDestinations && egressRule.matchAllPorts { comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, "", "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, "", "", ""); err != nil { return err } } if len(egressRule.dstIPBlocks) != 0 { - dstIpBlockIpSetName := policyIndexedDestinationIpBlockIpSetName(policy.namespace, policy.name, i) - dstIpBlockIpSet, err := npc.ipSetHandler.Create(dstIpBlockIpSetName, utils.TypeHashNet, utils.OptionTimeout, "0") + dstIPBlockIPSetName := policyIndexedDestinationIPBlockIPSetName(policy.namespace, policy.name, i) + dstIPBlockIPSet, err := npc.ipSetHandler.Create(dstIPBlockIPSetName, utils.TypeHashNet, utils.OptionTimeout, "0") if err != nil { return fmt.Errorf("failed to create ipset: %s", err.Error()) } - activePolicyIpSets[dstIpBlockIpSet.Name] = true - err = dstIpBlockIpSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks) + activePolicyIPSets[dstIPBlockIPSet.Name] = true + err = dstIPBlockIPSet.RefreshWithBuiltinOptions(egressRule.dstIPBlocks) if err != nil { - glog.Errorf("failed to refresh dstIpBlockIpSet: " + err.Error()) + glog.Errorf("failed to refresh dstIPBlockIPSet: " + err.Error()) } if !egressRule.matchAllPorts { for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstIpBlockIpSetName, portProtocol.protocol, portProtocol.port); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port); err != nil { return err } } @@ -727,7 +728,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if egressRule.matchAllPorts { comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIpSetName, dstIpBlockIpSetName, "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(iptablesCmdHandler, policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", ""); err != nil { return err } } @@ -736,7 +737,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, return nil } -func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIpSetName, dstIpSetName, protocol, dPort string) error { +func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler *iptables.IPTables, policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort string) error { if iptablesCmdHandler == nil { return fmt.Errorf("Failed to run iptables command: iptablesCmdHandler is nil") } @@ -744,11 +745,11 @@ func (npc *NetworkPolicyController) appendRuleToPolicyChain(iptablesCmdHandler * if comment != "" { args = append(args, "-m", "comment", "--comment", comment) } - if srcIpSetName != "" { - args = append(args, "-m", "set", "--match-set", srcIpSetName, "src") + if srcIPSetName != "" { + args = append(args, "-m", "set", "--match-set", srcIPSetName, "src") } - if dstIpSetName != "" { - args = append(args, "-m", "set", "--match-set", dstIpSetName, "dst") + if dstIPSetName != "" { + args = append(args, "-m", "set", "--match-set", dstIPSetName, "dst") } if protocol != "" { args = append(args, "-p", protocol) @@ -1071,8 +1072,8 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets } } for _, set := range ipsets.Sets { - if strings.HasPrefix(set.Name, kubeSourceIpSetPrefix) || - strings.HasPrefix(set.Name, kubeDestinationIpSetPrefix) { + if strings.HasPrefix(set.Name, kubeSourceIPSetPrefix) || + strings.HasPrefix(set.Name, kubeDestinationIPSetPrefix) { if _, ok := activePolicyIPSets[set.Name]; !ok { cleanupPolicyIPSets = append(cleanupPolicyIPSets, set) } @@ -1160,13 +1161,13 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets return nil } -func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIp string) (*map[string]podInfo, error) { +func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) { nodePods := make(map[string]podInfo) for _, obj := range npc.podLister.List() { pod := obj.(*api.Pod) - if strings.Compare(pod.Status.HostIP, nodeIp) != 0 { + if strings.Compare(pod.Status.HostIP, nodeIP) != 0 { continue } for _, policy := range networkPoliciesInfo { @@ -1188,14 +1189,14 @@ func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(networkPo } -func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIp string) (*map[string]podInfo, error) { +func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(networkPoliciesInfo []networkPolicyInfo, nodeIP string) (*map[string]podInfo, error) { nodePods := make(map[string]podInfo) for _, obj := range npc.podLister.List() { pod := obj.(*api.Pod) - if strings.Compare(pod.Status.HostIP, nodeIp) != 0 { + if strings.Compare(pod.Status.HostIP, nodeIP) != 0 { continue } for _, policy := range networkPoliciesInfo { @@ -1577,52 +1578,52 @@ func networkPolicyChainName(namespace, policyName string, version string) string return kubeNetworkPolicyChainPrefix + encoded[:16] } -func policySourcePodIpSetName(namespace, policyName string) string { +func policySourcePodIPSetName(namespace, policyName string) string { hash := sha256.Sum256([]byte(namespace + policyName)) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return kubeSourceIpSetPrefix + encoded[:16] + return kubeSourceIPSetPrefix + encoded[:16] } -func policyDestinationPodIpSetName(namespace, policyName string) string { +func policyDestinationPodIPSetName(namespace, policyName string) string { hash := sha256.Sum256([]byte(namespace + policyName)) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return kubeDestinationIpSetPrefix + encoded[:16] + return kubeDestinationIPSetPrefix + encoded[:16] } -func policyIndexedSourcePodIpSetName(namespace, policyName string, ingressRuleNo int) string { +func policyIndexedSourcePodIPSetName(namespace, policyName string, ingressRuleNo int) string { hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "pod")) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return kubeSourceIpSetPrefix + encoded[:16] + return kubeSourceIPSetPrefix + encoded[:16] } -func policyIndexedDestinationPodIpSetName(namespace, policyName string, egressRuleNo int) string { +func policyIndexedDestinationPodIPSetName(namespace, policyName string, egressRuleNo int) string { hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "pod")) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return kubeDestinationIpSetPrefix + encoded[:16] + return kubeDestinationIPSetPrefix + encoded[:16] } -func policyIndexedSourceIpBlockIpSetName(namespace, policyName string, ingressRuleNo int) string { +func policyIndexedSourceIPBlockIPSetName(namespace, policyName string, ingressRuleNo int) string { hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + "ipblock")) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return kubeSourceIpSetPrefix + encoded[:16] + return kubeSourceIPSetPrefix + encoded[:16] } -func policyIndexedDestinationIpBlockIpSetName(namespace, policyName string, egressRuleNo int) string { +func policyIndexedDestinationIPBlockIPSetName(namespace, policyName string, egressRuleNo int) string { hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + "ipblock")) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return kubeDestinationIpSetPrefix + encoded[:16] + return kubeDestinationIPSetPrefix + encoded[:16] } -func policyIndexedIngressNamedPortIpSetName(namespace, policyName string, ingressRuleNo, namedPortNo int) string { +func policyIndexedIngressNamedPortIPSetName(namespace, policyName string, ingressRuleNo, namedPortNo int) string { hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + strconv.Itoa(namedPortNo) + "namedport")) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return kubeDestinationIpSetPrefix + encoded[:16] + return kubeDestinationIPSetPrefix + encoded[:16] } -func policyIndexedEgressNamedPortIpSetName(namespace, policyName string, egressRuleNo, namedPortNo int) string { +func policyIndexedEgressNamedPortIPSetName(namespace, policyName string, egressRuleNo, namedPortNo int) string { hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + strconv.Itoa(namedPortNo) + "namedport")) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return kubeDestinationIpSetPrefix + encoded[:16] + return kubeDestinationIPSetPrefix + encoded[:16] } // Cleanup cleanup configurations done diff --git a/pkg/controllers/netpol/network_policy_controller_test.go b/pkg/controllers/netpol/network_policy_controller_test.go index cbb311e1b2..937c410978 100644 --- a/pkg/controllers/netpol/network_policy_controller_test.go +++ b/pkg/controllers/netpol/network_policy_controller_test.go @@ -2,14 +2,15 @@ package netpol import ( "context" - netv1 "k8s.io/api/networking/v1" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/client-go/tools/cache" "net" "strings" "testing" "time" + netv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/tools/cache" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -146,15 +147,15 @@ func tCreateFakePods(t *testing.T, podInformer cache.SharedIndexInformer, nsInfo {name: "nsC", labels: labels.Set{"name": "c"}}, {name: "nsD", labels: labels.Set{"name": "d"}}, } - ips_used := make(map[string]bool) + ipsUsed := make(map[string]bool) for _, pod := range pods { podNamespaceMap.addPod(pod) ipaddr := "1.1." + pod.ip - if ips_used[ipaddr] { + if ipsUsed[ipaddr] { t.Fatalf("there is another pod with the same Ip address %s as this pod %s namespace %s", ipaddr, pod.name, pod.name) } - ips_used[ipaddr] = true + ipsUsed[ipaddr] = true tAddToInformerStore(t, podInformer, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pod.name, Labels: pod.labels, Namespace: pod.namespace}, Status: v1.PodStatus{PodIP: ipaddr}}) diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index 38b26ae8f9..c23bfd411b 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -35,16 +35,16 @@ import ( ) const ( - KUBE_DUMMY_IF = "kube-dummy-if" - KUBE_TUNNEL_IF = "kube-tunnel-if" - IFACE_NOT_FOUND = "Link not found" - IFACE_HAS_ADDR = "file exists" - IFACE_HAS_NO_ADDR = "cannot assign requested address" - IPVS_SERVER_EXISTS = "file exists" - IPVS_MAGLEV_HASHING = "mh" - IPVS_SVC_F_SCHED1 = "flag-1" - IPVS_SVC_F_SCHED2 = "flag-2" - IPVS_SVC_F_SCHED3 = "flag-3" + KubeDummyIf = "kube-dummy-if" + KubeTunnelIf = "kube-tunnel-if" + IfaceNotFound = "Link not found" + IfaceHasAddr = "file exists" + IfaceHasNoAddr = "cannot assign requested address" + IpvsServerExists = "file exists" + IpvsMaglevHashing = "mh" + IpvsSvcFSched1 = "flag-1" + IpvsSvcFSched2 = "flag-2" + IpvsSvcFSched3 = "flag-3" svcDSRAnnotation = "kube-router.io/service.dsr" svcSchedulerAnnotation = "kube-router.io/service.scheduler" @@ -83,7 +83,7 @@ type ipvsCalls interface { type netlinkCalls interface { ipAddrAdd(iface netlink.Link, ip string, addRoute bool) error ipAddrDel(iface netlink.Link, ip string) error - prepareEndpointForDsr(containerId string, endpointIP string, vip string) error + prepareEndpointForDsr(containerID string, endpointIP string, vip string) error getKubeDummyInterface() (netlink.Link, error) setupRoutesForExternalIPForDSR(serviceInfoMap) error setupPolicyRoutingForDSR() error @@ -104,16 +104,16 @@ type linuxNetworking struct { func (ln *linuxNetworking) ipAddrDel(iface netlink.Link, ip string) error { naddr := &netlink.Addr{IPNet: &net.IPNet{IP: net.ParseIP(ip), Mask: net.IPv4Mask(255, 255, 255, 255)}, Scope: syscall.RT_SCOPE_LINK} err := netlink.AddrDel(iface, naddr) - if err != nil && err.Error() != IFACE_HAS_NO_ADDR { + if err != nil && err.Error() != IfaceHasNoAddr { glog.Errorf("Failed to verify is external ip %s is assocated with dummy interface %s due to %s", - naddr.IPNet.IP.String(), KUBE_DUMMY_IF, err.Error()) + naddr.IPNet.IP.String(), KubeDummyIf, err.Error()) } // Delete VIP addition to "local" rt table also, fail silently if not found (DSR special case) if err == nil { - out, err := exec.Command("ip", "route", "delete", "local", ip, "dev", KUBE_DUMMY_IF, "table", "local", "proto", "kernel", "scope", "host", "src", + out, err := exec.Command("ip", "route", "delete", "local", ip, "dev", KubeDummyIf, "table", "local", "proto", "kernel", "scope", "host", "src", NodeIP.String(), "table", "local").CombinedOutput() if err != nil && !strings.Contains(string(out), "No such process") { - glog.Errorf("Failed to delete route to service VIP %s configured on %s. Error: %v, Output: %s", ip, KUBE_DUMMY_IF, err, out) + glog.Errorf("Failed to delete route to service VIP %s configured on %s. Error: %v, Output: %s", ip, KubeDummyIf, err, out) } } return err @@ -125,7 +125,7 @@ func (ln *linuxNetworking) ipAddrDel(iface netlink.Link, ip string) error { func (ln *linuxNetworking) ipAddrAdd(iface netlink.Link, ip string, addRoute bool) error { naddr := &netlink.Addr{IPNet: &net.IPNet{IP: net.ParseIP(ip), Mask: net.IPv4Mask(255, 255, 255, 255)}, Scope: syscall.RT_SCOPE_LINK} err := netlink.AddrAdd(iface, naddr) - if err != nil && err.Error() != IFACE_HAS_ADDR { + if err != nil && err.Error() != IfaceHasAddr { glog.Errorf("Failed to assign cluster ip %s to dummy interface: %s", naddr.IPNet.IP.String(), err.Error()) return err @@ -142,10 +142,10 @@ func (ln *linuxNetworking) ipAddrAdd(iface netlink.Link, ip string, addRoute boo // TODO: netlink.RouteReplace which is replacement for below command is not working as expected. Call succeeds but // route is not replaced. For now do it with command. - out, err := exec.Command("ip", "route", "replace", "local", ip, "dev", KUBE_DUMMY_IF, "table", "local", "proto", "kernel", "scope", "host", "src", + out, err := exec.Command("ip", "route", "replace", "local", ip, "dev", KubeDummyIf, "table", "local", "proto", "kernel", "scope", "host", "src", NodeIP.String(), "table", "local").CombinedOutput() if err != nil { - glog.Errorf("Failed to replace route to service VIP %s configured on %s. Error: %v, Output: %s", ip, KUBE_DUMMY_IF, err, out) + glog.Errorf("Failed to replace route to service VIP %s configured on %s. Error: %v, Output: %s", ip, KubeDummyIf, err, out) } return nil } @@ -213,7 +213,7 @@ type NetworkServicesController struct { globalHairpin bool ipvsPermitAll bool client kubernetes.Interface - nodeportBindOnAllIp bool + nodeportBindOnAllIP bool MetricsEnabled bool ln LinuxNetworking readyForUpdates bool @@ -843,7 +843,7 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *api.Service) { type externalIPService struct { ipvsSvc *ipvs.Service - externalIp string + externalIP string } func hasActiveEndpoints(svc *serviceInfo, endpoints []endpointsInfo) bool { @@ -872,7 +872,7 @@ func (nsc *NetworkServicesController) getPodObjectForEndpoint(endpointIP string) // - enter process network namespace and create ipip tunnel // - add VIP to the tunnel interface // - disable rp_filter -func (ln *linuxNetworking) prepareEndpointForDsr(containerId string, endpointIP string, vip string) error { +func (ln *linuxNetworking) prepareEndpointForDsr(containerID string, endpointIP string, vip string) error { // FIXME: its possible switch namespaces may never work safely in GO without hacks. // https://groups.google.com/forum/#!topic/golang-nuts/ss1gEOcehjk/discussion @@ -904,7 +904,7 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerId string, endpointIP } defer dockerClient.Close() - containerSpec, err := dockerClient.ContainerInspect(context.Background(), containerId) + containerSpec, err := dockerClient.ContainerInspect(context.Background(), containerID) if err != nil { return errors.New("Failed to get docker container spec due to " + err.Error()) } @@ -932,9 +932,9 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerId string, endpointIP // way to switch back to old namespace, pretty much all things will go wrong if we dont switch back // create a ipip tunnel interface inside the endpoint container - tunIf, err := netlink.LinkByName(KUBE_TUNNEL_IF) + tunIf, err := netlink.LinkByName(KubeTunnelIf) if err != nil { - if err.Error() != IFACE_NOT_FOUND { + if err.Error() != IfaceNotFound { err = netns.Set(hostNetworkNamespaceHandle) if err != nil { return errors.New("Failed to get hostNetworkNamespace due to " + err.Error()) @@ -945,9 +945,9 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerId string, endpointIP return errors.New("Failed to verify if ipip tunnel interface exists in endpoint " + endpointIP + " namespace due to " + err.Error()) } - glog.V(2).Infof("Could not find tunnel interface " + KUBE_TUNNEL_IF + " in endpoint " + endpointIP + " so creating one.") + glog.V(2).Infof("Could not find tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + " so creating one.") ipTunLink := netlink.Iptun{ - LinkAttrs: netlink.LinkAttrs{Name: KUBE_TUNNEL_IF}, + LinkAttrs: netlink.LinkAttrs{Name: KubeTunnelIf}, Local: net.ParseIP(endpointIP), } err = netlink.LinkAdd(&ipTunLink) @@ -966,12 +966,12 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerId string, endpointIP // need to find the root cause for retry := 0; retry < 60; retry++ { time.Sleep(100 * time.Millisecond) - tunIf, err = netlink.LinkByName(KUBE_TUNNEL_IF) + tunIf, err = netlink.LinkByName(KubeTunnelIf) if err == nil { break } - if err != nil && err.Error() == IFACE_NOT_FOUND { - glog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KUBE_TUNNEL_IF) + if err != nil && err.Error() == IfaceNotFound { + glog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KubeTunnelIf) continue } else { break @@ -986,10 +986,10 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerId string, endpointIP activeNetworkNamespaceHandle, err = netns.Get() glog.V(2).Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String()) activeNetworkNamespaceHandle.Close() - return errors.New("Failed to get " + KUBE_TUNNEL_IF + " tunnel interface handle due to " + err.Error()) + return errors.New("Failed to get " + KubeTunnelIf + " tunnel interface handle due to " + err.Error()) } - glog.V(2).Infof("Successfully created tunnel interface " + KUBE_TUNNEL_IF + " in endpoint " + endpointIP + ".") + glog.V(2).Infof("Successfully created tunnel interface " + KubeTunnelIf + " in endpoint " + endpointIP + ".") } // bring the tunnel interface up @@ -1007,7 +1007,7 @@ func (ln *linuxNetworking) prepareEndpointForDsr(containerId string, endpointIP // assign VIP to the KUBE_TUNNEL_IF interface err = ln.ipAddrAdd(tunIf, vip, false) - if err != nil && err.Error() != IFACE_HAS_ADDR { + if err != nil && err.Error() != IfaceHasAddr { err = netns.Set(hostNetworkNamespaceHandle) if err != nil { return errors.New("Failed to set hostNetworkNamespace handle due to " + err.Error()) @@ -1128,13 +1128,13 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { svcInfo.scheduler = ipvs.DestinationHashing } else if schedulingMethod == ipvs.SourceHashing { svcInfo.scheduler = ipvs.SourceHashing - } else if schedulingMethod == IPVS_MAGLEV_HASHING { - svcInfo.scheduler = IPVS_MAGLEV_HASHING + } else if schedulingMethod == IpvsMaglevHashing { + svcInfo.scheduler = IpvsMaglevHashing } } flags, ok := svc.ObjectMeta.Annotations[svcSchedFlagsAnnotation] - if ok && svcInfo.scheduler == IPVS_MAGLEV_HASHING { + if ok && svcInfo.scheduler == IpvsMaglevHashing { svcInfo.flags = parseSchedFlags(flags) } @@ -1158,8 +1158,8 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { svcInfo.local = true } - svcId := generateServiceId(svc.Namespace, svc.Name, port.Name) - serviceMap[svcId] = &svcInfo + svcID := generateServiceID(svc.Namespace, svc.Name, port.Name) + serviceMap[svcID] = &svcInfo } } return serviceMap @@ -1175,11 +1175,11 @@ func parseSchedFlags(value string) schedFlags { flags := strings.Split(value, ",") for _, flag := range flags { switch strings.Trim(flag, " ") { - case IPVS_SVC_F_SCHED1: + case IpvsSvcFSched1: flag1 = true - case IPVS_SVC_F_SCHED2: + case IpvsSvcFSched2: flag2 = true - case IPVS_SVC_F_SCHED3: + case IpvsSvcFSched3: flag3 = true default: } @@ -1203,13 +1203,13 @@ func (nsc *NetworkServicesController) buildEndpointsInfo() endpointsInfoMap { for _, epSubset := range ep.Subsets { for _, port := range epSubset.Ports { - svcId := generateServiceId(ep.Namespace, ep.Name, port.Name) + svcID := generateServiceID(ep.Namespace, ep.Name, port.Name) endpoints := make([]endpointsInfo, 0) for _, addr := range epSubset.Addresses { isLocal := addr.NodeName != nil && *addr.NodeName == nsc.nodeHostName endpoints = append(endpoints, endpointsInfo{ip: addr.IP, port: int(port.Port), isLocal: isLocal}) } - endpointsMap[svcId] = shuffle(endpoints) + endpointsMap[svcID] = shuffle(endpoints) } } } @@ -1771,7 +1771,7 @@ func (ln *linuxNetworking) ipvsAddServer(service *ipvs.Service, dest *ipvs.Desti return nil } - if strings.Contains(err.Error(), IPVS_SERVER_EXISTS) { + if strings.Contains(err.Error(), IpvsServerExists) { err = ln.ipvsUpdateDestination(service, dest) if err != nil { return fmt.Errorf("Failed to update ipvs destination %s to the ipvs service %s due to : %s", @@ -1790,7 +1790,7 @@ func (ln *linuxNetworking) ipvsAddServer(service *ipvs.Service, dest *ipvs.Desti const ( customDSRRouteTableID = "78" customDSRRouteTableName = "kube-router-dsr" - externalIPRouteTableId = "79" + externalIPRouteTableID = "79" externalIPRouteTableName = "external_ip" ) @@ -1906,7 +1906,7 @@ func (ln *linuxNetworking) setupRoutesForExternalIPForDSR(serviceInfoMap service return errors.New("Failed setup external ip routing table required for DSR due to " + err.Error()) } defer f.Close() - if _, err = f.WriteString(externalIPRouteTableId + " " + externalIPRouteTableName + "\n"); err != nil { + if _, err = f.WriteString(externalIPRouteTableID + " " + externalIPRouteTableName + "\n"); err != nil { return errors.New("Failed setup external ip routing table required for DSR due to " + err.Error()) } } @@ -1916,15 +1916,15 @@ func (ln *linuxNetworking) setupRoutesForExternalIPForDSR(serviceInfoMap service return errors.New("Failed to verify if `ip rule add prio 32765 from all lookup external_ip` exists due to: " + err.Error()) } - if !(strings.Contains(string(out), externalIPRouteTableName) || strings.Contains(string(out), externalIPRouteTableId)) { - err = exec.Command("ip", "rule", "add", "prio", "32765", "from", "all", "lookup", externalIPRouteTableId).Run() + if !(strings.Contains(string(out), externalIPRouteTableName) || strings.Contains(string(out), externalIPRouteTableID)) { + err = exec.Command("ip", "rule", "add", "prio", "32765", "from", "all", "lookup", externalIPRouteTableID).Run() if err != nil { glog.Infof("Failed to add policy rule `ip rule add prio 32765 from all lookup external_ip` due to " + err.Error()) return errors.New("Failed to add policy rule `ip rule add prio 32765 from all lookup external_ip` due to " + err.Error()) } } - out, _ = exec.Command("ip", "route", "list", "table", externalIPRouteTableId).Output() + out, _ = exec.Command("ip", "route", "list", "table", externalIPRouteTableID).Output() outStr := string(out) activeExternalIPs := make(map[string]bool) for _, svc := range serviceInfoMap { @@ -1938,7 +1938,7 @@ func (ln *linuxNetworking) setupRoutesForExternalIPForDSR(serviceInfoMap service if !strings.Contains(outStr, externalIP) { if err = exec.Command("ip", "route", "add", externalIP, "dev", "kube-bridge", "table", - externalIPRouteTableId).Run(); err != nil { + externalIPRouteTableID).Run(); err != nil { glog.Error("Failed to add route for " + externalIP + " in custom route table for external IP's due to: " + err.Error()) continue } @@ -1946,14 +1946,14 @@ func (ln *linuxNetworking) setupRoutesForExternalIPForDSR(serviceInfoMap service } } - // check if there are any pbr in externalIPRouteTableId for external IP's + // check if there are any pbr in externalIPRouteTableID for external IP's if len(outStr) > 0 { // clean up stale external IPs for _, line := range strings.Split(strings.Trim(outStr, "\n"), "\n") { route := strings.Split(strings.Trim(line, " "), " ") ip := route[0] if !activeExternalIPs[ip] { - args := []string{"route", "del", "table", externalIPRouteTableId} + args := []string{"route", "del", "table", externalIPRouteTableID} args = append(args, route...) if err = exec.Command("ip", args...).Run(); err != nil { glog.Errorf("Failed to del route for %v in custom route table for external IP's due to: %s", ip, err) @@ -1972,16 +1972,16 @@ func isEndpointsForLeaderElection(ep *api.Endpoints) bool { } // unique identifier for a load-balanced service (namespace + name + portname) -func generateServiceId(namespace, svcName, port string) string { +func generateServiceID(namespace, svcName, port string) string { return namespace + "-" + svcName + "-" + port } // unique identifier for a load-balanced service (namespace + name + portname) -func generateIpPortId(ip, protocol, port string) string { +func generateIPPortID(ip, protocol, port string) string { return ip + "-" + protocol + "-" + port } -func generateEndpointId(ip, port string) string { +func generateEndpointID(ip, port string) string { return ip + ":" + port } @@ -2017,14 +2017,14 @@ func getAllLocalIPs() ([]netlink.Addr, error) { func (ln *linuxNetworking) getKubeDummyInterface() (netlink.Link, error) { var dummyVipInterface netlink.Link - dummyVipInterface, err := netlink.LinkByName(KUBE_DUMMY_IF) - if err != nil && err.Error() == IFACE_NOT_FOUND { - glog.V(1).Infof("Could not find dummy interface: " + KUBE_DUMMY_IF + " to assign cluster ip's, creating one") - err = netlink.LinkAdd(&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: KUBE_DUMMY_IF}}) + dummyVipInterface, err := netlink.LinkByName(KubeDummyIf) + if err != nil && err.Error() == IfaceNotFound { + glog.V(1).Infof("Could not find dummy interface: " + KubeDummyIf + " to assign cluster ip's, creating one") + err = netlink.LinkAdd(&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: KubeDummyIf}}) if err != nil { return nil, errors.New("Failed to add dummy interface: " + err.Error()) } - dummyVipInterface, err = netlink.LinkByName(KUBE_DUMMY_IF) + dummyVipInterface, err = netlink.LinkByName(KubeDummyIf) if err != nil { return nil, errors.New("Failed to get dummy interface: " + err.Error()) } @@ -2066,15 +2066,15 @@ func (nsc *NetworkServicesController) Cleanup() { nsc.cleanupIpvsFirewall() // delete dummy interface used to assign cluster IP's - dummyVipInterface, err := netlink.LinkByName(KUBE_DUMMY_IF) + dummyVipInterface, err := netlink.LinkByName(KubeDummyIf) if err != nil { - if err.Error() != IFACE_NOT_FOUND { - glog.Infof("Dummy interface: " + KUBE_DUMMY_IF + " does not exist") + if err.Error() != IfaceNotFound { + glog.Infof("Dummy interface: " + KubeDummyIf + " does not exist") } } else { err = netlink.LinkDel(dummyVipInterface) if err != nil { - glog.Errorf("Could not delete dummy interface " + KUBE_DUMMY_IF + " due to " + err.Error()) + glog.Errorf("Could not delete dummy interface " + KubeDummyIf + " due to " + err.Error()) return } } @@ -2235,8 +2235,8 @@ func NewNetworkServicesController(clientset kubernetes.Interface, nsc.masqueradeAll = true } - if config.NodePortBindOnAllIp { - nsc.nodeportBindOnAllIp = true + if config.NodePortBindOnAllIP { + nsc.nodeportBindOnAllIP = true } if config.RunRouter { diff --git a/pkg/controllers/proxy/service_endpoints_sync.go b/pkg/controllers/proxy/service_endpoints_sync.go index c4935002a7..3d9876a361 100644 --- a/pkg/controllers/proxy/service_endpoints_sync.go +++ b/pkg/controllers/proxy/service_endpoints_sync.go @@ -114,8 +114,8 @@ func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serv glog.Errorf("Failed to create ipvs service for cluster ip: %s", err.Error()) continue } - var clusterServiceId = generateIpPortId(svc.clusterIP.String(), svc.protocol, strconv.Itoa(svc.port)) - activeServiceEndpointMap[clusterServiceId] = make([]string, 0) + var clusterServiceID = generateIPPortID(svc.clusterIP.String(), svc.protocol, strconv.Itoa(svc.port)) + activeServiceEndpointMap[clusterServiceID] = make([]string, 0) // add IPVS remote server to the IPVS service for _, endpoint := range endpoints { @@ -139,7 +139,7 @@ func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serv if err != nil { glog.Errorf(err.Error()) } else { - activeServiceEndpointMap[clusterServiceId] = append(activeServiceEndpointMap[clusterServiceId], generateEndpointId(endpoint.ip, strconv.Itoa(endpoint.port))) + activeServiceEndpointMap[clusterServiceID] = append(activeServiceEndpointMap[clusterServiceID], generateEndpointID(endpoint.ip, strconv.Itoa(endpoint.port))) } } } @@ -178,7 +178,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi var nodeServiceIds []string - if nsc.nodeportBindOnAllIp { + if nsc.nodeportBindOnAllIP { // bind on all interfaces instead addrs, err := getAllLocalIPs() @@ -202,7 +202,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi continue } - nodeServiceIds[i] = generateIpPortId(addr.IP.String(), svc.protocol, strconv.Itoa(svc.nodePort)) + nodeServiceIds[i] = generateIPPortID(addr.IP.String(), svc.protocol, strconv.Itoa(svc.nodePort)) activeServiceEndpointMap[nodeServiceIds[i]] = make([]string, 0) } } else { @@ -214,7 +214,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi } nodeServiceIds = make([]string, 1) - nodeServiceIds[0] = generateIpPortId(nsc.nodeIP.String(), svc.protocol, strconv.Itoa(svc.nodePort)) + nodeServiceIds[0] = generateIPPortID(nsc.nodeIP.String(), svc.protocol, strconv.Itoa(svc.nodePort)) activeServiceEndpointMap[nodeServiceIds[0]] = make([]string, 0) } @@ -231,7 +231,7 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi if err != nil { glog.Errorf(err.Error()) } else { - activeServiceEndpointMap[nodeServiceIds[i]] = append(activeServiceEndpointMap[nodeServiceIds[i]], generateEndpointId(endpoint.ip, strconv.Itoa(endpoint.port))) + activeServiceEndpointMap[nodeServiceIds[i]] = append(activeServiceEndpointMap[nodeServiceIds[i]], generateEndpointID(endpoint.ip, strconv.Itoa(endpoint.port))) } } } @@ -264,7 +264,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser return errors.New("Failed creating dummy interface: " + err.Error()) } - externalIpServices := make([]externalIPService, 0) + externalIPServices := make([]externalIPService, 0) // create IPVS service for the service to be exposed through the external IP's // For external IP (which are meant for ingress traffic) Kube-router setsup IPVS services // based on FWMARK to enable Direct server return functionality. DSR requires a director @@ -285,23 +285,23 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser continue } for _, externalIP := range extIPSet.List() { - var externalIpServiceId string + var externalIPServiceID string if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" { ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { glog.Errorf("Failed to create ipvs service for External IP: %s due to: %s", externalIP, err.Error()) continue } - externalIpServices = append(externalIpServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIp: externalIP}) + externalIPServices = append(externalIPServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIP: externalIP}) fwMark, err := generateFwmark(externalIP, svc.protocol, strconv.Itoa(svc.port)) if err != nil { glog.Errorf("Failed to generate Fwmark") continue } - externalIpServiceId = fmt.Sprint(fwMark) + externalIPServiceID = fmt.Sprint(fwMark) // ensure there is iptables mangle table rule to FWMARK the packet - err = setupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), externalIpServiceId) + err = setupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), externalIPServiceID) if err != nil { glog.Errorf("Failed to setup mangle table rule to FMWARD the traffic to external IP") continue @@ -310,7 +310,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser // ensure VIP less director. we dont assign VIP to any interface err = nsc.ln.ipAddrDel(dummyVipInterface, externalIP) if err != nil { - glog.Errorf("Failed to delete external ip adress from dummyVipInterface due to %s", err) + glog.Errorf("Failed to delete external ip address from dummyVipInterface due to %s", err) continue } // do policy routing to deliver the packet locally so that IPVS can pick the packet @@ -323,8 +323,8 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser } else { // ensure director with vip assigned err := nsc.ln.ipAddrAdd(dummyVipInterface, externalIP, true) - if err != nil && err.Error() != IFACE_HAS_ADDR { - glog.Errorf("Failed to assign external ip %s to dummy interface %s due to %s", externalIP, KUBE_DUMMY_IF, err.Error()) + if err != nil && err.Error() != IfaceHasAddr { + glog.Errorf("Failed to assign external ip %s to dummy interface %s due to %s", externalIP, KubeDummyIf, err.Error()) } // create IPVS service for the service to be exposed through the external ip @@ -333,8 +333,8 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser glog.Errorf("Failed to create ipvs service for external ip: %s due to %s", externalIP, err.Error()) continue } - externalIpServices = append(externalIpServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIp: externalIP}) - externalIpServiceId = generateIpPortId(externalIP, svc.protocol, strconv.Itoa(svc.port)) + externalIPServices = append(externalIPServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIP: externalIP}) + externalIPServiceID = generateIPPortID(externalIP, svc.protocol, strconv.Itoa(svc.port)) // ensure there is NO iptables mangle table rule to FWMARK the packet fwmark, err := generateFwmark(externalIP, svc.protocol, strconv.Itoa(svc.port)) @@ -350,10 +350,10 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser } } - activeServiceEndpointMap[externalIpServiceId] = make([]string, 0) + activeServiceEndpointMap[externalIPServiceID] = make([]string, 0) for _, endpoint := range endpoints { if !svc.local || (svc.local && endpoint.isLocal) { - activeServiceEndpointMap[externalIpServiceId] = append(activeServiceEndpointMap[externalIpServiceId], generateEndpointId(endpoint.ip, strconv.Itoa(endpoint.port))) + activeServiceEndpointMap[externalIPServiceID] = append(activeServiceEndpointMap[externalIPServiceID], generateEndpointID(endpoint.ip, strconv.Itoa(endpoint.port))) } } } @@ -367,7 +367,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser Weight: 1, } - for _, externalIpService := range externalIpServices { + for _, externalIPService := range externalIPServices { if svc.local && !endpoint.isLocal { continue } @@ -377,7 +377,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser } // add server to IPVS service - err := nsc.ln.ipvsAddServer(externalIpService.ipvsSvc, &dst) + err := nsc.ln.ipvsAddServer(externalIPService.ipvsSvc, &dst) if err != nil { glog.Errorf(err.Error()) } @@ -402,7 +402,7 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser continue } - err = nsc.ln.prepareEndpointForDsr(containerID, endpoint.ip, externalIpService.externalIp) + err = nsc.ln.prepareEndpointForDsr(containerID, endpoint.ip, externalIPService.externalIP) if err != nil { glog.Errorf("Failed to prepare endpoint %s to do direct server return due to %s", endpoint.ip, err.Error()) } @@ -436,7 +436,7 @@ func (nsc *NetworkServicesController) cleanupStaleVIPs(activeServiceEndpointMap glog.V(1).Info("Cleaning up if any, old service IPs on dummy interface") addrActive := make(map[string]bool) for k := range activeServiceEndpointMap { - // verify active and its a generateIpPortId() type service + // verify active and its a generateIPPortID() type service if strings.Contains(k, "-") { parts := strings.SplitN(k, "-", 3) addrActive[parts[0]] = true @@ -488,16 +488,16 @@ func (nsc *NetworkServicesController) cleanupStaleIPVSConfig(activeServiceEndpoi } var key string if ipvsSvc.Address != nil { - key = generateIpPortId(ipvsSvc.Address.String(), protocol, strconv.Itoa(int(ipvsSvc.Port))) + key = generateIPPortID(ipvsSvc.Address.String(), protocol, strconv.Itoa(int(ipvsSvc.Port))) } else if ipvsSvc.FWMark != 0 { key = fmt.Sprint(ipvsSvc.FWMark) } else { continue } - endpointIds, ok := activeServiceEndpointMap[key] + endpointIDs, ok := activeServiceEndpointMap[key] // Only delete the service if it's not there anymore to prevent flapping - // old: if !ok || len(endpointIds) == 0 { + // old: if !ok || len(endpointIDs) == 0 { if !ok { excluded := false for _, excludedCidr := range nsc.excludedCidrs { @@ -528,8 +528,8 @@ func (nsc *NetworkServicesController) cleanupStaleIPVSConfig(activeServiceEndpoi } for _, dst := range dsts { validEp := false - for _, epId := range endpointIds { - if epId == generateEndpointId(dst.Address.String(), strconv.Itoa(int(dst.Port))) { + for _, epID := range endpointIDs { + if epID == generateEndpointID(dst.Address.String(), strconv.Itoa(int(dst.Port))) { validEp = true break } diff --git a/pkg/controllers/routing/aws.go b/pkg/controllers/routing/aws.go index 25ba35b28e..1b8810540f 100644 --- a/pkg/controllers/routing/aws.go +++ b/pkg/controllers/routing/aws.go @@ -18,7 +18,7 @@ import ( // disableSourceDestinationCheck disables src-dst check of all the VM's when cluster // is provisioned on AWS. EC2 by default drops any packets originating or destination // to a VM with IP other than that of VM's ip. This check needs to be disabled so that -// cross node pod-to-pod traffic can be sent and recived by a VM. +// cross node pod-to-pod traffic can be sent and received by a VM. func (nrc *NetworkRoutingController) disableSourceDestinationCheck() { nodes := nrc.nodeLister.List() diff --git a/pkg/controllers/routing/bgp_peers.go b/pkg/controllers/routing/bgp_peers.go index 1e42747c9e..0545199b36 100644 --- a/pkg/controllers/routing/bgp_peers.go +++ b/pkg/controllers/routing/bgp_peers.go @@ -198,7 +198,7 @@ func (nrc *NetworkRoutingController) syncInternalPeers() { // connectToExternalBGPPeers adds all the configured eBGP peers (global or node specific) as neighbours func connectToExternalBGPPeers(server *gobgp.BgpServer, peerNeighbors []*config.Neighbor, bgpGracefulRestart bool, bgpGracefulRestartDeferralTime time.Duration, - bgpGracefulRestartTime time.Duration, peerMultihopTtl uint8) error { + bgpGracefulRestartTime time.Duration, peerMultihopTTL uint8) error { for _, n := range peerNeighbors { if bgpGracefulRestart { @@ -238,15 +238,15 @@ func connectToExternalBGPPeers(server *gobgp.BgpServer, peerNeighbors []*config. }, } } - if peerMultihopTtl > 1 { + if peerMultihopTTL > 1 { n.EbgpMultihop = config.EbgpMultihop{ Config: config.EbgpMultihopConfig{ Enabled: true, - MultihopTtl: peerMultihopTtl, + MultihopTtl: peerMultihopTTL, }, State: config.EbgpMultihopState{ Enabled: true, - MultihopTtl: peerMultihopTtl, + MultihopTtl: peerMultihopTTL, }, } } @@ -284,7 +284,7 @@ func newGlobalPeers(ips []net.IP, ports []uint16, asns []uint32, passwords []str return nil, errors.New("Invalid peer router config. " + "The number of ports should either be zero, or one per peer router." + " If blank items are used, it will default to standard BGP port, " + - strconv.Itoa(options.DEFAULT_BGP_PORT) + "\n" + + strconv.Itoa(options.DefaultBgpPort) + "\n" + "Example: \"port,,port\" OR [\"port\",\"\",\"port\"].") } @@ -306,7 +306,7 @@ func newGlobalPeers(ips []net.IP, ports []uint16, asns []uint32, passwords []str Timers: config.Timers{Config: config.TimersConfig{HoldTime: holdtime}}, Transport: config.Transport{ Config: config.TransportConfig{ - RemotePort: options.DEFAULT_BGP_PORT, + RemotePort: options.DefaultBgpPort, }, }, } diff --git a/pkg/controllers/routing/bgp_policies.go b/pkg/controllers/routing/bgp_policies.go index de17ab7091..7564ba77b0 100644 --- a/pkg/controllers/routing/bgp_policies.go +++ b/pkg/controllers/routing/bgp_policies.go @@ -3,6 +3,7 @@ package routing import ( "errors" "fmt" + "github.com/golang/glog" "github.com/cloudnativelabs/kube-router/pkg/utils" @@ -141,7 +142,7 @@ func (nrc *NetworkRoutingController) AddPolicies() error { // BGP export policies are added so that following conditions are met: // -// - by default export of all routes from the RIB to the neighbour's is denied, and explicity statements are added i +// - by default export of all routes from the RIB to the neighbour's is denied, and explicitly statements are added i // to permit the desired routes to be exported // - each node is allowed to advertise its assigned pod CIDR's to all of its iBGP peer neighbours with same ASN if --enable-ibgp=true // - each node is allowed to advertise its assigned pod CIDR's to all of its external BGP peer neighbours diff --git a/pkg/controllers/routing/ecmp_vip.go b/pkg/controllers/routing/ecmp_vip.go index 552a1bd803..bd6b189e09 100644 --- a/pkg/controllers/routing/ecmp_vip.go +++ b/pkg/controllers/routing/ecmp_vip.go @@ -174,7 +174,7 @@ func (nrc *NetworkRoutingController) OnServiceUpdate(objNew interface{}, objOld func (nrc *NetworkRoutingController) getWithdraw(svcOld, svcNew *v1core.Service) (out []string) { if svcOld != nil && svcNew != nil { - out = getMissingPrevGen(nrc.getExternalIps(svcOld), nrc.getExternalIps(svcNew)) + out = getMissingPrevGen(nrc.getExternalIPs(svcOld), nrc.getExternalIPs(svcNew)) } return } @@ -273,43 +273,43 @@ func (nrc *NetworkRoutingController) serviceForEndpoints(ep *v1core.Endpoints) ( return item, nil } -func (nrc *NetworkRoutingController) getClusterIp(svc *v1core.Service) string { - clusterIp := "" +func (nrc *NetworkRoutingController) getClusterIP(svc *v1core.Service) string { + clusterIP := "" if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" || svc.Spec.Type == "LoadBalancer" { // skip headless services if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" { - clusterIp = svc.Spec.ClusterIP + clusterIP = svc.Spec.ClusterIP } } - return clusterIp + return clusterIP } -func (nrc *NetworkRoutingController) getExternalIps(svc *v1core.Service) []string { - externalIpList := make([]string, 0) +func (nrc *NetworkRoutingController) getExternalIPs(svc *v1core.Service) []string { + externalIPList := make([]string, 0) if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" { // skip headless services if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" { - externalIpList = append(externalIpList, svc.Spec.ExternalIPs...) + externalIPList = append(externalIPList, svc.Spec.ExternalIPs...) } } - return externalIpList + return externalIPList } -func (nrc *NetworkRoutingController) getLoadBalancerIps(svc *v1core.Service) []string { - loadBalancerIpList := make([]string, 0) +func (nrc *NetworkRoutingController) getLoadBalancerIPs(svc *v1core.Service) []string { + loadBalancerIPList := make([]string, 0) if svc.Spec.Type == "LoadBalancer" { // skip headless services if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" { for _, lbIngress := range svc.Status.LoadBalancer.Ingress { if len(lbIngress.IP) > 0 { - loadBalancerIpList = append(loadBalancerIpList, lbIngress.IP) + loadBalancerIPList = append(loadBalancerIPList, lbIngress.IP) } } } } - return loadBalancerIpList + return loadBalancerIPList } func (nrc *NetworkRoutingController) getAllVIPs() ([]string, []string, error) { @@ -384,21 +384,21 @@ func (nrc *NetworkRoutingController) getAllVIPsForService(svc *v1core.Service) [ ipList := make([]string, 0) if nrc.shouldAdvertiseService(svc, svcAdvertiseClusterAnnotation, nrc.advertiseClusterIP) { - clusterIP := nrc.getClusterIp(svc) + clusterIP := nrc.getClusterIP(svc) if clusterIP != "" { ipList = append(ipList, clusterIP) } } if nrc.shouldAdvertiseService(svc, svcAdvertiseExternalAnnotation, nrc.advertiseExternalIP) { - ipList = append(ipList, nrc.getExternalIps(svc)...) + ipList = append(ipList, nrc.getExternalIPs(svc)...) } // Deprecated: Use service.advertise.loadbalancer=false instead of service.skiplbips. _, skiplbips := svc.Annotations[svcSkipLbIpsAnnotation] advertiseLoadBalancer := nrc.shouldAdvertiseService(svc, svcAdvertiseLoadBalancerAnnotation, nrc.advertiseLoadBalancerIP) if advertiseLoadBalancer && !skiplbips { - ipList = append(ipList, nrc.getLoadBalancerIps(svc)...) + ipList = append(ipList, nrc.getLoadBalancerIPs(svc)...) } return ipList diff --git a/pkg/controllers/routing/network_routes_controller.go b/pkg/controllers/routing/network_routes_controller.go index d2dad6ac5f..14b3ec870f 100644 --- a/pkg/controllers/routing/network_routes_controller.go +++ b/pkg/controllers/routing/network_routes_controller.go @@ -34,7 +34,7 @@ import ( ) const ( - IFACE_NOT_FOUND = "Link not found" + IfaceNotFound = "Link not found" customRouteTableID = "77" customRouteTableName = "kube-router" @@ -67,7 +67,7 @@ type NetworkRoutingController struct { nodeName string nodeSubnet net.IPNet nodeInterface string - routerId string + routerID string isIpv6 bool activeNodes map[string]bool mu sync.Mutex @@ -189,7 +189,7 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll // create 'kube-bridge' interface to which pods will be connected _, err = netlink.LinkByName("kube-bridge") - if err != nil && err.Error() == IFACE_NOT_FOUND { + if err != nil && err.Error() == IfaceNotFound { linkAttrs := netlink.NewLinkAttrs() linkAttrs.Name = "kube-bridge" bridge := &netlink.Bridge{LinkAttrs: linkAttrs} @@ -594,9 +594,8 @@ func (nrc *NetworkRoutingController) syncNodeIPSets() error { func (nrc *NetworkRoutingController) newIptablesCmdHandler() (*iptables.IPTables, error) { if nrc.isIpv6 { return iptables.NewWithProtocol(iptables.ProtocolIPv6) - } else { - return iptables.NewWithProtocol(iptables.ProtocolIPv4) } + return iptables.NewWithProtocol(iptables.ProtocolIPv4) } // ensure there is rule in filter table and FORWARD chain to permit in/out traffic from pods @@ -741,7 +740,7 @@ func (nrc *NetworkRoutingController) startBgpServer() error { global := &config.Global{ Config: config.GlobalConfig{ As: nodeAsnNumber, - RouterId: nrc.routerId, + RouterId: nrc.routerID, LocalAddressList: localAddressList, Port: int32(nrc.bgpPort), }, @@ -882,7 +881,7 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, nrc.bgpGracefulRestart = kubeRouterConfig.BGPGracefulRestart nrc.bgpGracefulRestartDeferralTime = kubeRouterConfig.BGPGracefulRestartDeferralTime nrc.bgpGracefulRestartTime = kubeRouterConfig.BGPGracefulRestartTime - nrc.peerMultihopTTL = kubeRouterConfig.PeerMultihopTtl + nrc.peerMultihopTTL = kubeRouterConfig.PeerMultihopTTL nrc.enablePodEgress = kubeRouterConfig.EnablePodEgress nrc.syncPeriod = kubeRouterConfig.RoutesSyncPeriod nrc.overrideNextHop = kubeRouterConfig.OverrideNextHop @@ -894,9 +893,9 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, nrc.disableSrcDstCheck = kubeRouterConfig.DisableSrcDstCheck nrc.initSrcDstCheckDone = false - nrc.bgpHoldtime = kubeRouterConfig.BGPHoldtime.Seconds() + nrc.bgpHoldtime = kubeRouterConfig.BGPHoldTime.Seconds() if nrc.bgpHoldtime > 65536 || nrc.bgpHoldtime < 3 { - return nil, errors.New("This is an incorrect BGP holdtime range, holdtime must be in the range 3s to 18h12m16s.") + return nil, errors.New("this is an incorrect BGP holdtime range, holdtime must be in the range 3s to 18h12m16s") } nrc.hostnameOverride = kubeRouterConfig.HostnameOverride @@ -914,13 +913,13 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, nrc.nodeIP = nodeIP nrc.isIpv6 = nodeIP.To4() == nil - if kubeRouterConfig.RouterId != "" { - nrc.routerId = kubeRouterConfig.RouterId + if kubeRouterConfig.RouterID != "" { + nrc.routerID = kubeRouterConfig.RouterID } else { if nrc.isIpv6 { return nil, errors.New("Router-id must be specified in ipv6 operation") } - nrc.routerId = nrc.nodeIP.String() + nrc.routerID = nrc.nodeIP.String() } // lets start with assumption we hace necessary IAM creds to access EC2 api @@ -972,9 +971,9 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, nrc.defaultNodeAsnNumber = 64512 // this magic number is first of the private ASN range, use it as default } - nrc.advertiseClusterIP = kubeRouterConfig.AdvertiseClusterIp - nrc.advertiseExternalIP = kubeRouterConfig.AdvertiseExternalIp - nrc.advertiseLoadBalancerIP = kubeRouterConfig.AdvertiseLoadBalancerIp + nrc.advertiseClusterIP = kubeRouterConfig.AdvertiseClusterIP + nrc.advertiseExternalIP = kubeRouterConfig.AdvertiseExternalIP + nrc.advertiseLoadBalancerIP = kubeRouterConfig.AdvertiseLoadBalancerIP nrc.advertisePodCidr = kubeRouterConfig.AdvertiseNodePodCidr nrc.enableOverlays = kubeRouterConfig.EnableOverlay nrc.overlayType = kubeRouterConfig.OverlayType diff --git a/pkg/controllers/routing/network_routes_controller_test.go b/pkg/controllers/routing/network_routes_controller_test.go index e66d4fd37c..20c7fafbcc 100644 --- a/pkg/controllers/routing/network_routes_controller_test.go +++ b/pkg/controllers/routing/network_routes_controller_test.go @@ -1354,7 +1354,7 @@ func Test_routeReflectorConfiguration(t *testing.T) { node *v1core.Node expectedRRServer bool expectedRRClient bool - expectedClusterId string + expectedClusterID string expectedBgpToStart bool }{ { @@ -1540,8 +1540,8 @@ func Test_routeReflectorConfiguration(t *testing.T) { if testcase.expectedRRClient != testcase.nrc.bgpRRClient { t.Error("Node suppose to be RR client") } - if testcase.expectedClusterId != testcase.nrc.bgpClusterID { - t.Errorf("Node suppose to have cluster id '%s' but got %s", testcase.expectedClusterId, testcase.nrc.bgpClusterID) + if testcase.expectedClusterID != testcase.nrc.bgpClusterID { + t.Errorf("Node suppose to have cluster id '%s' but got %s", testcase.expectedClusterID, testcase.nrc.bgpClusterID) } } else { if err == nil { diff --git a/pkg/controllers/routing/utils.go b/pkg/controllers/routing/utils.go index c05dc13a50..1a4b918edf 100644 --- a/pkg/controllers/routing/utils.go +++ b/pkg/controllers/routing/utils.go @@ -99,7 +99,7 @@ func ipv6IsEnabled() bool { return true } -func getNodeSubnet(nodeIp net.IP) (net.IPNet, string, error) { +func getNodeSubnet(nodeIP net.IP) (net.IPNet, string, error) { links, err := netlink.LinkList() if err != nil { return net.IPNet{}, "", errors.New("Failed to get list of links") @@ -110,7 +110,7 @@ func getNodeSubnet(nodeIp net.IP) (net.IPNet, string, error) { return net.IPNet{}, "", errors.New("Failed to get list of addr") } for _, addr := range addresses { - if addr.IPNet.IP.Equal(nodeIp) { + if addr.IPNet.IP.Equal(nodeIP) { return *addr.IPNet, link.Attrs().Name, nil } } diff --git a/pkg/options/options.go b/pkg/options/options.go index 7335f8652e..3aa5fa3f63 100644 --- a/pkg/options/options.go +++ b/pkg/options/options.go @@ -9,18 +9,18 @@ import ( "github.com/spf13/pflag" ) -const DEFAULT_BGP_PORT = 179 -const DEFAULT_BGP_HOLDTIME time.Duration = 90 * time.Second +const DefaultBgpPort = 179 +const DefaultBgpHoldTime time.Duration = 90 * time.Second type KubeRouterConfig struct { - AdvertiseClusterIp bool - AdvertiseExternalIp bool + AdvertiseClusterIP bool + AdvertiseExternalIP bool AdvertiseNodePodCidr bool - AdvertiseLoadBalancerIp bool + AdvertiseLoadBalancerIP bool BGPGracefulRestart bool BGPGracefulRestartTime time.Duration BGPGracefulRestartDeferralTime time.Duration - BGPHoldtime time.Duration + BGPHoldTime time.Duration BGPPort uint16 CacheSyncTimeout time.Duration CleanupConfig bool @@ -52,14 +52,14 @@ type KubeRouterConfig struct { MetricsEnabled bool MetricsPath string MetricsPort uint16 - NodePortBindOnAllIp bool + NodePortBindOnAllIP bool OverrideNextHop bool PeerASNs []uint - PeerMultihopTtl uint8 + PeerMultihopTTL uint8 PeerPasswords []string PeerPorts []uint PeerRouters []net.IP - RouterId string + RouterID string RoutesSyncPeriod time.Duration RunFirewall bool RunRouter bool @@ -77,7 +77,7 @@ func NewKubeRouterConfig() *KubeRouterConfig { IpvsGracefulPeriod: 30 * time.Second, RoutesSyncPeriod: 5 * time.Minute, BGPGracefulRestartTime: 90 * time.Second, - BGPHoldtime: 90 * time.Second, + BGPHoldTime: 90 * time.Second, BGPGracefulRestartDeferralTime: 360 * time.Second, EnableOverlay: true, OverlayType: "subnet", @@ -129,23 +129,23 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { "Enables rule to accept all incoming traffic to service VIP's on the node.") fs.DurationVar(&s.RoutesSyncPeriod, "routes-sync-period", s.RoutesSyncPeriod, "The delay between route updates and advertisements (e.g. '5s', '1m', '2h22m'). Must be greater than 0.") - fs.BoolVar(&s.AdvertiseClusterIp, "advertise-cluster-ip", false, + fs.BoolVar(&s.AdvertiseClusterIP, "advertise-cluster-ip", false, "Add Cluster IP of the service to the RIB so that it gets advertises to the BGP peers.") - fs.BoolVar(&s.AdvertiseExternalIp, "advertise-external-ip", false, + fs.BoolVar(&s.AdvertiseExternalIP, "advertise-external-ip", false, "Add External IP of service to the RIB so that it gets advertised to the BGP peers.") - fs.BoolVar(&s.AdvertiseLoadBalancerIp, "advertise-loadbalancer-ip", false, + fs.BoolVar(&s.AdvertiseLoadBalancerIP, "advertise-loadbalancer-ip", false, "Add LoadbBalancer IP of service status as set by the LB provider to the RIB so that it gets advertised to the BGP peers.") fs.BoolVar(&s.AdvertiseNodePodCidr, "advertise-pod-cidr", true, "Add Node's POD cidr to the RIB so that it gets advertised to the BGP peers.") fs.IPSliceVar(&s.PeerRouters, "peer-router-ips", s.PeerRouters, "The ip address of the external router to which all nodes will peer and advertise the cluster ip and pod cidr's.") fs.UintSliceVar(&s.PeerPorts, "peer-router-ports", s.PeerPorts, - "The remote port of the external BGP to which all nodes will peer. If not set, default BGP port ("+strconv.Itoa(DEFAULT_BGP_PORT)+") will be used.") + "The remote port of the external BGP to which all nodes will peer. If not set, default BGP port ("+strconv.Itoa(DefaultBgpPort)+") will be used.") fs.UintVar(&s.ClusterAsn, "cluster-asn", s.ClusterAsn, "ASN number under which cluster nodes will run iBGP.") fs.UintSliceVar(&s.PeerASNs, "peer-router-asns", s.PeerASNs, "ASN numbers of the BGP peer to which cluster nodes will advertise cluster ip and node's pod cidr.") - fs.Uint8Var(&s.PeerMultihopTtl, "peer-router-multihop-ttl", s.PeerMultihopTtl, + fs.Uint8Var(&s.PeerMultihopTTL, "peer-router-multihop-ttl", s.PeerMultihopTTL, "Enable eBGP multihop supports -- sets multihop-ttl. (Relevant only if ttl >= 2)") fs.BoolVar(&s.FullMeshMode, "nodes-full-mesh", true, "Each node in the cluster will setup BGP peering with rest of the nodes.") @@ -155,11 +155,11 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { "BGP Graceful restart time according to RFC4724 3, maximum 4095s.") fs.DurationVar(&s.BGPGracefulRestartDeferralTime, "bgp-graceful-restart-deferral-time", s.BGPGracefulRestartDeferralTime, "BGP Graceful restart deferral time according to RFC4724 4.1, maximum 18h.") - fs.DurationVar(&s.BGPHoldtime, "bgp-holdtime", DEFAULT_BGP_HOLDTIME, + fs.DurationVar(&s.BGPHoldTime, "bgp-holdtime", DefaultBgpHoldTime, "This parameter is mainly used to modify the holdtime declared to BGP peer. When Kube-router goes down abnormally, the local saving time of BGP route will be affected.Holdtime must be in the range 3s to 18h12m16s.") - fs.Uint16Var(&s.BGPPort, "bgp-port", DEFAULT_BGP_PORT, + fs.Uint16Var(&s.BGPPort, "bgp-port", DefaultBgpPort, "The port open for incoming BGP connections and to use for connecting with other BGP peers.") - fs.StringVar(&s.RouterId, "router-id", "", "BGP router-id. Must be specified in a ipv6 only cluster.") + fs.StringVar(&s.RouterID, "router-id", "", "BGP router-id. Must be specified in a ipv6 only cluster.") fs.BoolVar(&s.EnableCNI, "enable-cni", true, "Enable CNI plugin. Disable if you want to use kube-router features alongside another CNI plugin.") fs.BoolVar(&s.EnableiBGP, "enable-ibgp", true, @@ -168,7 +168,7 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { "Overrides the NodeName of the node. Set this if kube-router is unable to determine your NodeName automatically.") fs.BoolVar(&s.GlobalHairpinMode, "hairpin-mode", false, "Add iptables rules for every Service Endpoint to support hairpin traffic.") - fs.BoolVar(&s.NodePortBindOnAllIp, "nodeport-bindon-all-ip", false, + fs.BoolVar(&s.NodePortBindOnAllIP, "nodeport-bindon-all-ip", false, "For service of NodePort type create IPVS service that listens on all IP's of the node.") fs.BoolVar(&s.EnableOverlay, "enable-overlay", true, "When enable-overlay is set to true, IP-in-IP tunneling is used for pod-to-pod networking across nodes in different subnets. "+ diff --git a/pkg/utils/ipset.go b/pkg/utils/ipset.go index 25531473e9..12c1c9d905 100644 --- a/pkg/utils/ipset.go +++ b/pkg/utils/ipset.go @@ -313,9 +313,8 @@ func (set *Set) IsActive() (bool, error) { func (set *Set) name() string { if set.Parent.isIpv6 { return "inet6:" + set.Name - } else { - return set.Name } + return set.Name } // Parse ipset save stdout. diff --git a/pkg/utils/pod_cidr.go b/pkg/utils/pod_cidr.go index 1d6be3f3e5..3d3e5dafd0 100644 --- a/pkg/utils/pod_cidr.go +++ b/pkg/utils/pod_cidr.go @@ -85,7 +85,7 @@ func InsertPodCidrInCniSpec(cniConfFilePath string, cidr string) error { } if !updatedCidr { - return fmt.Errorf("Failed to insert subnet cidr into CNI conf file: %s as CNI file is invalid.", cniConfFilePath) + return fmt.Errorf("failed to insert subnet cidr into CNI conf file: %s as CNI file is invalid", cniConfFilePath) } } else {