diff --git a/pkg/network/node/egressip.go b/pkg/network/node/egressip.go new file mode 100644 index 000000000000..72a75f713941 --- /dev/null +++ b/pkg/network/node/egressip.go @@ -0,0 +1,300 @@ +package node + +import ( + "fmt" + "net" + "sync" + "syscall" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/util/sets" + utilwait "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + + osclient "github.com/openshift/origin/pkg/client" + networkapi "github.com/openshift/origin/pkg/network/apis/network" + "github.com/openshift/origin/pkg/network/common" + + "github.com/vishvananda/netlink" +) + +type nodeEgress struct { + nodeIP string + egressIPs sets.String +} + +type namespaceEgress struct { + vnid uint32 + + // claimedIP is the egress IP it wants (NetNamespace.EgressIP[0]), or "" for none + claimedIP string + // assignedIP is an egress IP actually in use on nodeIP + assignedIP string + nodeIP string +} + +type egressIPWatcher struct { + sync.Mutex + + localIP string + oc *ovsController + + osClient *osclient.Client + iptables *NodeIPTables + + // from HostSubnets + nodesByNodeIP map[string]*nodeEgress + nodesByEgressIP map[string]*nodeEgress + + // From NetNamespaces + namespacesByVNID map[uint32]*namespaceEgress + namespacesByEgressIP map[string]*namespaceEgress + + localEgressLink netlink.Link + localEgressIPMaskLen int + + testModeChan chan string +} + +func newEgressIPWatcher(localIP string, oc *ovsController) *egressIPWatcher { + return &egressIPWatcher{ + localIP: localIP, + oc: oc, + + nodesByNodeIP: make(map[string]*nodeEgress), + nodesByEgressIP: make(map[string]*nodeEgress), + + namespacesByVNID: make(map[uint32]*namespaceEgress), + namespacesByEgressIP: make(map[string]*namespaceEgress), + } +} + +func (eip *egressIPWatcher) Start(osClient *osclient.Client, iptables *NodeIPTables) error { + eip.iptables = iptables + eip.osClient = osClient + + go utilwait.Forever(eip.watchHostSubnets, 0) + go utilwait.Forever(eip.watchNetNamespaces, 0) + return nil +} + +func ipToHex(ip string) string { + bytes := net.ParseIP(ip) + if bytes == nil { + return "invalid IP: shouldn't happen" + } + bytes = bytes.To4() + return fmt.Sprintf("0x%02x%02x%02x%02x", bytes[0], bytes[1], bytes[2], bytes[3]) +} + +func (eip *egressIPWatcher) watchHostSubnets() { + common.RunEventQueue(eip.osClient, common.HostSubnets, func(delta cache.Delta) error { + hs := delta.Object.(*networkapi.HostSubnet) + + var egressIPs []string + if delta.Type != cache.Deleted { + egressIPs = hs.EgressIPs + } + + eip.updateNode(hs.HostIP, egressIPs) + return nil + }) +} + +func (eip *egressIPWatcher) updateNode(nodeIP string, nodeEgressIPs []string) { + eip.Lock() + defer eip.Unlock() + + node := eip.nodesByNodeIP[nodeIP] + if node == nil { + if len(nodeEgressIPs) == 0 { + return + } + node = &nodeEgress{nodeIP: nodeIP, egressIPs: sets.NewString()} + eip.nodesByNodeIP[nodeIP] = node + } else if len(nodeEgressIPs) == 0 { + delete(eip.nodesByNodeIP, nodeIP) + } + oldEgressIPs := node.egressIPs + node.egressIPs = sets.NewString(nodeEgressIPs...) + + // Process new EgressIPs + for _, ip := range node.egressIPs.Difference(oldEgressIPs).UnsortedList() { + eip.nodesByEgressIP[ip] = node + hex := ipToHex(ip) + + if nodeIP == eip.localIP { + if err := eip.claimEgressIP(ip, hex); err != nil { + glog.Errorf("Error claiming Egress IP %q: %v", ip, err) + } + } + + if ns, exists := eip.namespacesByEgressIP[ip]; exists { + if ns.assignedIP == "" { + ns.assignedIP = ip + ns.nodeIP = nodeIP + err := eip.oc.UpdateNamespaceEgressRules(ns.vnid, ns.nodeIP, hex) + if err != nil { + glog.Errorf("Error updating Namespace egress rules: %v", err) + } + } + } + } + + // Process removed EgressIPs + for _, ip := range oldEgressIPs.Difference(node.egressIPs).UnsortedList() { + delete(eip.nodesByEgressIP, ip) + hex := ipToHex(ip) + + if nodeIP == eip.localIP { + if err := eip.releaseEgressIP(ip, hex); err != nil { + glog.Errorf("Error releasing Egress IP %q: %v", ip, err) + } + } + + if ns, exists := eip.namespacesByEgressIP[ip]; exists { + if ns.assignedIP == ip { + ns.assignedIP = "" + ns.nodeIP = "" + err := eip.oc.UpdateNamespaceEgressRules(ns.vnid, "", hex) + if err != nil { + glog.Errorf("Error updating Namespace egress rules: %v", err) + } + } + } + } +} + +func (eip *egressIPWatcher) watchNetNamespaces() { + common.RunEventQueue(eip.osClient, common.NetNamespaces, func(delta cache.Delta) error { + netns := delta.Object.(*networkapi.NetNamespace) + + var egressIP string + if delta.Type != cache.Deleted && len(netns.EgressIPs) != 0 { + egressIP = netns.EgressIPs[0] + } + + eip.updateNamespace(netns.NetID, egressIP) + return nil + }) +} + +func (eip *egressIPWatcher) updateNamespace(vnid uint32, egressIP string) { + eip.Lock() + defer eip.Unlock() + + ns := eip.namespacesByVNID[vnid] + if ns == nil { + if egressIP == "" { + return + } + ns = &namespaceEgress{vnid: vnid} + eip.namespacesByVNID[vnid] = ns + } + if ns.claimedIP == egressIP { + return + } + + if ns.claimedIP != "" { + delete(eip.namespacesByEgressIP, ns.claimedIP) + ns.assignedIP = "" + ns.nodeIP = "" + } + ns.claimedIP = egressIP + eip.namespacesByEgressIP[egressIP] = ns + if node := eip.nodesByEgressIP[egressIP]; node != nil { + ns.assignedIP = egressIP + ns.nodeIP = node.nodeIP + } + + egressHex := "" + if egressIP != "" { + egressHex = ipToHex(egressIP) + } + + err := eip.oc.UpdateNamespaceEgressRules(ns.vnid, ns.nodeIP, egressHex) + if err != nil { + glog.Errorf("Error updating Namespace egress rules: %v", err) + } +} + +func (eip *egressIPWatcher) claimEgressIP(egressIP, egressHex string) error { + if eip.testModeChan != nil { + eip.testModeChan <- fmt.Sprintf("claim %s", egressIP) + return nil + } + + if eip.localEgressLink == nil { + links, err := netlink.LinkList() + if err != nil { + return fmt.Errorf("could not get list of network interfaces while adding egress IP: %v", err) + } + linkLoop: + for _, link := range links { + addrs, err := netlink.AddrList(link, syscall.AF_INET) + if err != nil { + return fmt.Errorf("could not get addresses of interface %q while adding egress IP: %v", link.Attrs().Name, err) + } + + for _, addr := range addrs { + if addr.IP.String() == eip.localIP { + eip.localEgressLink = link + eip.localEgressIPMaskLen, _ = addr.Mask.Size() + break linkLoop + } + } + } + + if eip.localEgressLink == nil { + return fmt.Errorf("could not find network interface with the address %q while adding egress IP", eip.localIP) + } + } + + egressIPNet := fmt.Sprintf("%s/%d", egressIP, eip.localEgressIPMaskLen) + addr, err := netlink.ParseAddr(egressIPNet) + if err != nil { + return fmt.Errorf("could not parse egress IP %q: %v", egressIPNet, err) + } + err = netlink.AddrAdd(eip.localEgressLink, addr) + if err != nil { + return fmt.Errorf("could not add egress IP %q to %s: %v", egressIPNet, eip.localEgressLink.Attrs().Name, err) + } + + if err := eip.iptables.AddEgressIPRules(egressIP, egressHex); err != nil { + return fmt.Errorf("could not add egress IP iptables rule: %v", err) + } + + return nil +} + +func (eip *egressIPWatcher) releaseEgressIP(egressIP, egressHex string) error { + if eip.testModeChan != nil { + eip.testModeChan <- fmt.Sprintf("release %s", egressIP) + return nil + } + + if eip.localEgressLink == nil { + return nil + } + + egressIPNet := fmt.Sprintf("%s/%d", egressIP, eip.localEgressIPMaskLen) + addr, err := netlink.ParseAddr(egressIPNet) + if err != nil { + return fmt.Errorf("could not parse egress IP %q: %v", egressIPNet, err) + } + err = netlink.AddrDel(eip.localEgressLink, addr) + if err != nil { + if err == syscall.EADDRNOTAVAIL { + glog.V(2).Infof("Could not delete egress IP %q from %s: no such address", egressIPNet, eip.localEgressLink.Attrs().Name) + } else { + return fmt.Errorf("could not delete egress IP %q from %s: %v", egressIPNet, eip.localEgressLink.Attrs().Name, err) + } + } + + if err := eip.iptables.DeleteEgressIPRules(egressIP, egressHex); err != nil { + return fmt.Errorf("could not delete egress IP iptables rule: %v", err) + } + + return nil +} diff --git a/pkg/network/node/egressip_test.go b/pkg/network/node/egressip_test.go new file mode 100644 index 000000000000..ae72d24b7e63 --- /dev/null +++ b/pkg/network/node/egressip_test.go @@ -0,0 +1,275 @@ +package node + +import ( + "fmt" + "testing" +) + +func assertNoNetlinkChanges(eip *egressIPWatcher) error { + select { + case change := <-eip.testModeChan: + return fmt.Errorf("Unexpected netlink change %q", change) + default: + return nil + } +} + +func assertNetlinkChange(eip *egressIPWatcher, expected string) error { + select { + case change := <-eip.testModeChan: + if change == expected { + return nil + } + return fmt.Errorf("Unexpected netlink change %q (expected %q)", change, expected) + default: + return fmt.Errorf("Missing netlink change (expected %q)", expected) + } +} + +func TestEgressIP(t *testing.T) { + ovsif, oc, origFlows := setupOVSController(t) + if oc.localIP != "172.17.0.4" { + panic("details of fake ovsController changed") + } + eip := newEgressIPWatcher("172.17.0.4", oc) + eip.testModeChan = make(chan string, 10) + + eip.updateNode("172.17.0.3", []string{}) + eip.updateNode("172.17.0.4", []string{}) + eip.updateNamespace(42, "") + eip.updateNamespace(43, "") + + // No namespaces use egress yet, so should be no changes + err := assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } + flows, err := ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows) // no changes + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + + // Assign NetNamespace.EgressIP first, then HostSubnet.EgressIP, with a remote EgressIP + eip.updateNamespace(42, "172.17.0.100") + err = assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowAdded, + match: []string{"table=100", "reg0=42", "drop"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + + eip.updateNode("172.17.0.3", []string{"172.17.0.100"}) + err = assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowAdded, + match: []string{"table=100", "reg0=42", "172.17.0.3->tun_dst"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + origFlows = flows + + // Assign HostSubnet.EgressIP first, then NetNamespace.EgressIP, with a remote EgressIP + eip.updateNode("172.17.0.3", []string{"172.17.0.101", "172.17.0.100"}) + err = assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + + eip.updateNamespace(43, "172.17.0.101") + err = assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowAdded, + match: []string{"table=100", "reg0=43", "172.17.0.3->tun_dst"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + origFlows = flows + + // Assign NetNamespace.EgressIP first, then HostSubnet.EgressIP, with a local EgressIP + eip.updateNamespace(44, "172.17.0.102") + err = assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowAdded, + match: []string{"table=100", "reg0=44", "drop"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + + eip.updateNode("172.17.0.4", []string{"172.17.0.102"}) + err = assertNetlinkChange(eip, "claim 172.17.0.102") + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowAdded, + match: []string{"table=100", "reg0=44", "0xac110066->pkt_mark", "output:2"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + origFlows = flows + + // Assign HostSubnet.EgressIP first, then NetNamespace.EgressIP, with a local EgressIP + eip.updateNode("172.17.0.4", []string{"172.17.0.102", "172.17.0.103"}) + err = assertNetlinkChange(eip, "claim 172.17.0.103") + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + + eip.updateNamespace(45, "172.17.0.103") + err = assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowAdded, + match: []string{"table=100", "reg0=45", "0xac110067->pkt_mark", "output:2"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + origFlows = flows + + // Drop namespace EgressIP + eip.updateNamespace(44, "") + err = assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowRemoved, + match: []string{"table=100", "reg0=44", "0xac110066->pkt_mark", "output:2"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + origFlows = flows + + // Drop remote node EgressIP + eip.updateNode("172.17.0.3", []string{"172.17.0.100"}) + err = assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowRemoved, + match: []string{"table=100", "reg0=43", "172.17.0.3->tun_dst"}, + }, + flowChange{ + kind: flowAdded, + match: []string{"table=100", "reg0=43", "drop"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } + origFlows = flows + + // Drop local node EgressIP + eip.updateNode("172.17.0.4", []string{"172.17.0.102"}) + err = assertNetlinkChange(eip, "release 172.17.0.103") + if err != nil { + t.Fatalf("%v", err) + } + flows, err = ovsif.DumpFlows() + if err != nil { + t.Fatalf("Unexpected error dumping flows: %v", err) + } + err = assertFlowChanges(origFlows, flows, + flowChange{ + kind: flowRemoved, + match: []string{"table=100", "reg0=45", "0xac110067->pkt_mark", "output:2"}, + }, + flowChange{ + kind: flowAdded, + match: []string{"table=100", "reg0=45", "drop"}, + }, + ) + if err != nil { + t.Fatalf("Unexpected flow changes: %v", err) + } +} diff --git a/pkg/network/node/iptables.go b/pkg/network/node/iptables.go index 269c01998cef..f263f5a7ce1e 100644 --- a/pkg/network/node/iptables.go +++ b/pkg/network/node/iptables.go @@ -185,3 +185,20 @@ func (n *NodeIPTables) getNodeIPTablesChains() []Chain { }, } } + +func (n *NodeIPTables) AddEgressIPRules(egressIP, egressHex string) error { + _, err := n.ipt.EnsureRule(iptables.Prepend, iptables.TableNAT, iptables.Chain("OPENSHIFT-MASQUERADE"), "-s", n.clusterNetworkCIDR, "-m", "mark", "--mark", egressHex, "-j", "SNAT", "--to-source", egressIP) + if err != nil { + return err + } + _, err = n.ipt.EnsureRule(iptables.Append, iptables.TableFilter, iptables.Chain("OPENSHIFT-FIREWALL-ALLOW"), "-d", egressIP, "-m", "conntrack", "--ctstate", "NEW", "-j", "REJECT") + return err +} + +func (n *NodeIPTables) DeleteEgressIPRules(egressIP, egressHex string) error { + err := n.ipt.DeleteRule(iptables.TableNAT, iptables.Chain("OPENSHIFT-MASQUERADE"), "-s", n.clusterNetworkCIDR, "-m", "mark", "--mark", egressHex, "-j", "SNAT", "--to-source", egressIP) + if err != nil { + return err + } + return n.ipt.DeleteRule(iptables.TableFilter, iptables.Chain("OPENSHIFT-FIREWALL-ALLOW"), "-d", egressIP, "-m", "conntrack", "--ctstate", "NEW", "-j", "REJECT") +} diff --git a/pkg/network/node/node.go b/pkg/network/node/node.go index f00faa023891..245d716c6a13 100644 --- a/pkg/network/node/node.go +++ b/pkg/network/node/node.go @@ -110,6 +110,8 @@ type OsdnNode struct { runtimeEndpoint string runtimeRequestTimeout time.Duration runtimeService kubeletapi.RuntimeService + + egressIP *egressIPWatcher } // Called by higher layers to create the plugin SDN node instance @@ -171,7 +173,7 @@ func New(c *OsdnNodeConfig) (network.NodeInterface, error) { if err != nil { return nil, err } - oc := NewOVSController(ovsif, pluginId, useConnTrack) + oc := NewOVSController(ovsif, pluginId, useConnTrack, c.SelfIP) plugin := &OsdnNode{ policy: policy, @@ -187,6 +189,7 @@ func New(c *OsdnNodeConfig) (network.NodeInterface, error) { egressPolicies: make(map[uint32][]networkapi.EgressNetworkPolicy), egressDNS: common.NewEgressDNS(), kubeInformers: c.KubeInformers, + egressIP: newEgressIPWatcher(c.SelfIP, oc), runtimeEndpoint: c.RuntimeEndpoint, // 2 minutes is the current default value used in kubelet @@ -324,7 +327,9 @@ func (node *OsdnNode) Start() error { if err != nil { return err } - + if err = node.egressIP.Start(node.osClient, nodeIPTables); err != nil { + return err + } if err = node.policy.Start(node); err != nil { return err } diff --git a/pkg/network/node/ovscontroller.go b/pkg/network/node/ovscontroller.go index 5b99ac6c4a30..8b61443a1262 100644 --- a/pkg/network/node/ovscontroller.go +++ b/pkg/network/node/ovscontroller.go @@ -22,6 +22,7 @@ type ovsController struct { ovs ovs.Interface pluginId int useConnTrack bool + localIP string } const ( @@ -30,13 +31,13 @@ const ( Vxlan0 = "vxlan0" // rule versioning; increment each time flow rules change - ruleVersion = 4 + ruleVersion = 5 ruleVersionTable = 253 ) -func NewOVSController(ovsif ovs.Interface, pluginId int, useConnTrack bool) *ovsController { - return &ovsController{ovs: ovsif, pluginId: pluginId, useConnTrack: useConnTrack} +func NewOVSController(ovsif ovs.Interface, pluginId int, useConnTrack bool, localIP string) *ovsController { + return &ovsController{ovs: ovsif, pluginId: pluginId, useConnTrack: useConnTrack, localIP: localIP} } func (oc *ovsController) getVersionNote() string { @@ -61,7 +62,7 @@ func (oc *ovsController) AlreadySetUp() bool { return false } -func (oc *ovsController) SetupOVS(clusterNetworkCIDR, serviceNetworkCIDR, localSubnetCIDR, localSubnetGateway, nodeIP string) error { +func (oc *ovsController) SetupOVS(clusterNetworkCIDR, serviceNetworkCIDR, localSubnetCIDR, localSubnetGateway string) error { err := oc.ovs.AddBridge("fail-mode=secure", "protocols=OpenFlow13") if err != nil { return err @@ -175,11 +176,15 @@ func (oc *ovsController) SetupOVS(clusterNetworkCIDR, serviceNetworkCIDR, localS // eg, "table=90, priority=100, ip, nw_dst=${remote_subnet_cidr}, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31], set_field:${remote_node_ip}->tun_dst,output:1" otx.AddFlow("table=90, priority=0, actions=drop") - // Table 100: egress network policy dispatch; edited by UpdateEgressNetworkPolicy() - // eg, "table=100, reg0=${tenant_id}, priority=2, ip, nw_dst=${external_cidr}, actions=drop - otx.AddFlow("table=100, priority=0, actions=output:2") - otx.AddFlow("table=100, priority=%d,tcp,tcp_dst=53,nw_dst=%s,actions=output:2", networkapi.EgressNetworkPolicyMaxRules+1, nodeIP) - otx.AddFlow("table=100, priority=%d,udp,udp_dst=53,nw_dst=%s,actions=output:2", networkapi.EgressNetworkPolicyMaxRules+1, nodeIP) + // Table 100: egress routing; edited by UpdateNamespaceEgressRules() + // eg, FIXME + otx.AddFlow("table=100, priority=0, actions=goto_table:101") + + // Table 101: egress network policy dispatch; edited by UpdateEgressNetworkPolicy() + // eg, "table=101, reg0=${tenant_id}, priority=2, ip, nw_dst=${external_cidr}, actions=drop + otx.AddFlow("table=101, priority=%d,tcp,tcp_dst=53,nw_dst=%s,actions=output:2", networkapi.EgressNetworkPolicyMaxRules+1, oc.localIP) + otx.AddFlow("table=101, priority=%d,udp,udp_dst=53,nw_dst=%s,actions=output:2", networkapi.EgressNetworkPolicyMaxRules+1, oc.localIP) + otx.AddFlow("table=101, priority=0, actions=output:2") // Table 110: outbound multicast filtering, updated by UpdateLocalMulticastFlows() // eg, "table=110, priority=100, reg0=${tenant_id}, actions=goto_table:111 @@ -403,7 +408,7 @@ func (oc *ovsController) UpdateEgressNetworkPolicyRules(policies []networkapi.Eg var inputErr error if len(policies) == 0 { - otx.DeleteFlows("table=100, reg0=%d", vnid) + otx.DeleteFlows("table=101, reg0=%d", vnid) } else if vnid == 0 { inputErr = fmt.Errorf("EgressNetworkPolicy in global network namespace is not allowed (%s); ignoring", policyNames(policies)) } else if len(namespaces) > 1 { @@ -411,18 +416,18 @@ func (oc *ovsController) UpdateEgressNetworkPolicyRules(policies []networkapi.Eg // Even though Egress network policy is defined per namespace, its implementation is based on VNIDs. // So in case of shared network namespaces, egress policy of one namespace will affect all other namespaces that are sharing the network which might not be desirable. inputErr = fmt.Errorf("EgressNetworkPolicy not allowed in shared NetNamespace (%s); dropping all traffic", strings.Join(namespaces, ", ")) - otx.DeleteFlows("table=100, reg0=%d", vnid) - otx.AddFlow("table=100, reg0=%d, priority=1, actions=drop", vnid) + otx.DeleteFlows("table=101, reg0=%d", vnid) + otx.AddFlow("table=101, reg0=%d, priority=1, actions=drop", vnid) } else if len(policies) > 1 { // Rationale: If we have allowed more than one policy, we could end up with different network restrictions depending // on the order of policies that were processed and also it doesn't give more expressive power than a single policy. inputErr = fmt.Errorf("multiple EgressNetworkPolicies in same network namespace (%s) is not allowed; dropping all traffic", policyNames(policies)) - otx.DeleteFlows("table=100, reg0=%d", vnid) - otx.AddFlow("table=100, reg0=%d, priority=1, actions=drop", vnid) + otx.DeleteFlows("table=101, reg0=%d", vnid) + otx.AddFlow("table=101, reg0=%d, priority=1, actions=drop", vnid) } else /* vnid != 0 && len(policies) == 1 */ { // Temporarily drop all outgoing traffic, to avoid race conditions while modifying the other rules - otx.AddFlow("table=100, reg0=%d, cookie=1, priority=65535, actions=drop", vnid) - otx.DeleteFlows("table=100, reg0=%d, cookie=0/1", vnid) + otx.AddFlow("table=101, reg0=%d, cookie=1, priority=65535, actions=drop", vnid) + otx.DeleteFlows("table=101, reg0=%d, cookie=0/1", vnid) dnsFound := false for i, rule := range policies[0].Spec.Egress { @@ -457,18 +462,18 @@ func (oc *ovsController) UpdateEgressNetworkPolicyRules(policies []networkapi.Eg dst = fmt.Sprintf(", nw_dst=%s", selector) } - otx.AddFlow("table=100, reg0=%d, priority=%d, ip%s, actions=%s", vnid, priority, dst, action) + otx.AddFlow("table=101, reg0=%d, priority=%d, ip%s, actions=%s", vnid, priority, dst, action) } } if dnsFound { if err := common.CheckDNSResolver(); err != nil { inputErr = fmt.Errorf("DNS resolver failed: %v, dropping all traffic for namespace: %q", err, namespaces[0]) - otx.DeleteFlows("table=100, reg0=%d", vnid) - otx.AddFlow("table=100, reg0=%d, priority=1, actions=drop", vnid) + otx.DeleteFlows("table=101, reg0=%d", vnid) + otx.AddFlow("table=101, reg0=%d, priority=1, actions=drop", vnid) } } - otx.DeleteFlows("table=100, reg0=%d, cookie=1/1", vnid) + otx.DeleteFlows("table=101, reg0=%d, cookie=1/1", vnid) } txErr := otx.EndTransaction() @@ -646,3 +651,22 @@ func (oc *ovsController) FindUnusedVNIDs() []int { return policyVNIDs.Difference(inUseVNIDs).UnsortedList() } + +func (oc *ovsController) UpdateNamespaceEgressRules(vnid uint32, nodeIP, egressHex string) error { + otx := oc.ovs.NewTransaction() + otx.DeleteFlows("table=100, reg0=%d", vnid) + + if egressHex == "" { + // Namespace no longer has an EgressIP; no VNID-specific rules needed + } else if nodeIP == "" { + // Namespace has Egress IP, but it is unavailable, so drop egress traffic + otx.AddFlow("table=100, priority=100, reg0=%d, actions=drop", vnid) + } else if nodeIP == oc.localIP { + // Local Egress IP + otx.AddFlow("table=100, priority=100, reg0=%d, ip, actions=set_field:%s->pkt_mark,output:2", vnid, egressHex) + } else { + // Remote Egress IP; send via VXLAN + otx.AddFlow("table=100, priority=100, reg0=%d, ip, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", vnid, nodeIP) + } + return otx.EndTransaction() +} diff --git a/pkg/network/node/ovscontroller_test.go b/pkg/network/node/ovscontroller_test.go index 26b7e5e79754..919d20e99c3c 100644 --- a/pkg/network/node/ovscontroller_test.go +++ b/pkg/network/node/ovscontroller_test.go @@ -14,10 +14,10 @@ import ( kapi "k8s.io/kubernetes/pkg/api" ) -func setup(t *testing.T) (ovs.Interface, *ovsController, []string) { +func setupOVSController(t *testing.T) (ovs.Interface, *ovsController, []string) { ovsif := ovs.NewFake(Br0) - oc := NewOVSController(ovsif, 0, true) - err := oc.SetupOVS("10.128.0.0/14", "172.30.0.0/16", "10.128.0.0/23", "10.128.0.1", "172.17.0.4") + oc := NewOVSController(ovsif, 0, true, "172.17.0.4") + err := oc.SetupOVS("10.128.0.0/14", "172.30.0.0/16", "10.128.0.0/23", "10.128.0.1") if err != nil { t.Fatalf("Unexpected error setting up OVS: %v", err) } @@ -98,7 +98,7 @@ func assertFlowChanges(origFlows, newFlows []string, changes ...flowChange) erro } func TestOVSHostSubnet(t *testing.T) { - ovsif, oc, origFlows := setup(t) + ovsif, oc, origFlows := setupOVSController(t) hs := networkapi.HostSubnet{ TypeMeta: metav1.TypeMeta{ @@ -154,7 +154,7 @@ func TestOVSHostSubnet(t *testing.T) { } func TestOVSService(t *testing.T) { - ovsif, oc, origFlows := setup(t) + ovsif, oc, origFlows := setupOVSController(t) svc := kapi.Service{ TypeMeta: metav1.TypeMeta{ @@ -221,7 +221,7 @@ const ( ) func TestOVSPod(t *testing.T) { - ovsif, oc, origFlows := setup(t) + ovsif, oc, origFlows := setupOVSController(t) // Add ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", sandboxID, 42) @@ -421,7 +421,7 @@ func TestGetPodDetails(t *testing.T) { } func TestOVSMulticast(t *testing.T) { - ovsif, oc, origFlows := setup(t) + ovsif, oc, origFlows := setupOVSController(t) // local flows err := oc.UpdateLocalMulticastFlows(99, true, []int{4, 5, 6}) @@ -629,7 +629,7 @@ func assertENPFlowAdditions(origFlows, newFlows []string, additions ...enpFlowAd var change flowChange change.kind = flowAdded change.match = []string{ - "table=100", + "table=101", fmt.Sprintf("reg0=%d", addition.vnid), fmt.Sprintf("priority=%d", len(addition.policy.Spec.Egress)-i), } @@ -651,7 +651,7 @@ func assertENPFlowAdditions(origFlows, newFlows []string, additions ...enpFlowAd } func TestOVSEgressNetworkPolicy(t *testing.T) { - ovsif, oc, origFlows := setup(t) + ovsif, oc, origFlows := setupOVSController(t) // SUCCESSFUL CASES @@ -949,25 +949,21 @@ func TestOVSEgressNetworkPolicy(t *testing.T) { func TestAlreadySetUp(t *testing.T) { testcases := []struct { flow string - note string success bool }{ { // Good note - flow: "cookie=0x0, duration=4.796s, table=253, n_packets=0, n_bytes=0, actions=note:00.04.00.00.00.00", - note: "00.04", + flow: "cookie=0x0, duration=4.796s, table=253, n_packets=0, n_bytes=0, actions=note:00.05.00.00.00.00", success: true, }, { // Wrong table - flow: "cookie=0x0, duration=4.796s, table=10, n_packets=0, n_bytes=0, actions=note:00.04.00.00.00.00", - note: "00.04", + flow: "cookie=0x0, duration=4.796s, table=10, n_packets=0, n_bytes=0, actions=note:00.05.00.00.00.00", success: false, }, { // No note flow: "cookie=0x0, duration=4.796s, table=253, n_packets=0, n_bytes=0, actions=goto_table:50", - note: "00.04", success: false, }, } @@ -977,7 +973,7 @@ func TestAlreadySetUp(t *testing.T) { if err := ovsif.AddBridge("fail-mode=secure", "protocols=OpenFlow13"); err != nil { t.Fatalf("(%d) unexpected error from AddBridge: %v", i, err) } - oc := NewOVSController(ovsif, 0, true) + oc := NewOVSController(ovsif, 0, true, "172.17.0.4") otx := ovsif.NewTransaction() otx.AddFlow(tc.flow) @@ -1089,7 +1085,7 @@ func TestSyncVNIDRules(t *testing.T) { } for i, tc := range testcases { - _, oc, _ := setup(t) + _, oc, _ := setupOVSController(t) otx := oc.NewTransaction() for _, flow := range tc.flows { diff --git a/pkg/network/node/sdn_controller.go b/pkg/network/node/sdn_controller.go index d174565fb986..1b471345b46b 100644 --- a/pkg/network/node/sdn_controller.go +++ b/pkg/network/node/sdn_controller.go @@ -165,7 +165,7 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) { } glog.V(5).Infof("[SDN setup] full SDN setup required") - err = plugin.oc.SetupOVS(clusterNetworkCIDR, serviceNetworkCIDR, localSubnetCIDR, localSubnetGateway, plugin.localIP) + err = plugin.oc.SetupOVS(clusterNetworkCIDR, serviceNetworkCIDR, localSubnetCIDR, localSubnetGateway) if err != nil { return false, err }