diff --git a/cmd/yurt-tunnel-server/app/start.go b/cmd/yurt-tunnel-server/app/start.go index 34748eebe26..a28e79a80a6 100644 --- a/cmd/yurt-tunnel-server/app/start.go +++ b/cmd/yurt-tunnel-server/app/start.go @@ -81,6 +81,7 @@ func Run(cfg *config.CompletedConfig, stopCh <-chan struct{}) error { dnsController, err := dns.NewCoreDNSRecordController(cfg.Client, cfg.SharedInformerFactory, cfg.ListenInsecureAddrForMaster, + cfg.ListenAddrForMaster, cfg.DNSSyncPeriod) if err != nil { return fmt.Errorf("fail to create a new dnsController, %v", err) diff --git a/config/setup/yurt-tunnel-server.yaml b/config/setup/yurt-tunnel-server.yaml index e50080e9692..af370e8c656 100644 --- a/config/setup/yurt-tunnel-server.yaml +++ b/config/setup/yurt-tunnel-server.yaml @@ -129,6 +129,8 @@ metadata: name: yurt-tunnel-server-cfg namespace: kube-system data: + http-proxy-ports: "" + https-proxy-ports: "" dnat-ports-pair: "" --- apiVersion: apps/v1 diff --git a/config/yaml-template/yurt-tunnel-server.yaml b/config/yaml-template/yurt-tunnel-server.yaml index 48c93ebd41e..8b2f772866f 100644 --- a/config/yaml-template/yurt-tunnel-server.yaml +++ b/config/yaml-template/yurt-tunnel-server.yaml @@ -129,6 +129,8 @@ metadata: name: __project_prefix__-tunnel-server-cfg namespace: kube-system data: + http-proxy-ports: "" + https-proxy-ports: "" dnat-ports-pair: "" --- apiVersion: apps/v1 diff --git a/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go b/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go index e65d22e66ad..281b5a84ddb 100644 --- a/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go +++ b/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go @@ -155,6 +155,8 @@ metadata: name: yurt-tunnel-server-cfg namespace: kube-system data: + http-proxy-ports: "" + https-proxy-ports: "" dnat-ports-pair: "" ` YurttunnelServerDeployment = ` diff --git a/pkg/yurttunnel/dns/dns.go b/pkg/yurttunnel/dns/dns.go index 8c0d7f11647..58511f4b2bb 100644 --- a/pkg/yurttunnel/dns/dns.go +++ b/pkg/yurttunnel/dns/dns.go @@ -82,30 +82,22 @@ type coreDNSRecordController struct { cmInformerSynced cache.InformerSynced queue workqueue.RateLimitingInterface tunnelServerIP string - insecurePort int syncPeriod int + listenInsecureAddr string + listenSecureAddr string } // NewCoreDNSRecordController create a CoreDNSRecordController that synchronizes node dns records with CoreDNS configuration func NewCoreDNSRecordController(client clientset.Interface, informerFactory informers.SharedInformerFactory, listenInsecureAddr string, + listenSecureAddr string, syncPeriod int) (DNSRecordController, error) { - - _, insecurePortStr, err := net.SplitHostPort(listenInsecureAddr) - if err != nil { - return nil, err - } - - insecurePort, err := strconv.Atoi(insecurePortStr) - if err != nil { - return nil, err - } - dnsctl := &coreDNSRecordController{ kubeClient: client, syncPeriod: syncPeriod, - insecurePort: insecurePort, + listenInsecureAddr: listenInsecureAddr, + listenSecureAddr: listenSecureAddr, sharedInformerFactor: informerFactory, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tunnel-dns"), } @@ -321,11 +313,11 @@ func (dnsctl *coreDNSRecordController) ensureCoreDNSRecordConfigMap() error { func (dnsctl *coreDNSRecordController) syncTunnelServerServiceAsWhole() error { klog.V(2).Info("sync tunnel server service as whole") - dnatPorts, err := util.GetConfiguredDnatPorts(dnsctl.kubeClient, strconv.Itoa(dnsctl.insecurePort)) + dnatPorts, portMappings, err := util.GetConfiguredProxyPortsAndMappings(dnsctl.kubeClient, dnsctl.listenInsecureAddr, dnsctl.listenSecureAddr) if err != nil { return err } - return dnsctl.updateTunnelServerSvcDnatPorts(dnatPorts) + return dnsctl.updateTunnelServerSvcDnatPorts(dnatPorts, portMappings) } func (dnsctl *coreDNSRecordController) syncDNSRecordAsWhole() { @@ -403,60 +395,95 @@ func (dnsctl *coreDNSRecordController) updateDNSRecords(records []string) error return nil } -func (dnsctl *coreDNSRecordController) updateTunnelServerSvcDnatPorts(ports []string) error { +func (dnsctl *coreDNSRecordController) updateTunnelServerSvcDnatPorts(ports []string, portMappings map[string]string) error { svc, err := dnsctl.kubeClient.CoreV1().Services(constants.YurttunnelServerServiceNs). Get(context.Background(), constants.YurttunnelServerInternalServiceName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to sync tunnel server internal service, %v", err) } + changed, updatedSvcPorts := resolveServicePorts(svc, ports, portMappings) + if !changed { + return nil + } + + svc.Spec.Ports = updatedSvcPorts + _, err = dnsctl.kubeClient.CoreV1().Services(constants.YurttunnelServerServiceNs).Update(context.Background(), svc, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to sync tunnel server service, %v", err) + } + return nil +} + +// resolveServicePorts get service ports from specified service and ports. +func resolveServicePorts(svc *corev1.Service, ports []string, portMappings map[string]string) (bool, []corev1.ServicePort) { changed := false - svcPortMap := make(map[int32]corev1.ServicePort) + svcPortMap := make(map[string]corev1.ServicePort) for i := range svc.Spec.Ports { port := svc.Spec.Ports[i] - svcPortMap[port.Port] = port + svcPortMap[fmt.Sprintf("%s:%d", port.Protocol, port.Port)] = port } - dnatPortMap := make(map[int]bool) + dnatPortMap := make(map[string]bool) for _, dnatPort := range ports { portInt, err := strconv.Atoi(dnatPort) if err != nil { klog.Errorf("failed to parse dnat port %q, %v", dnatPort, err) continue } - dnatPortMap[portInt] = true - if p, ok := svcPortMap[int32(portInt)]; !ok || p.Protocol != corev1.ProtocolTCP { - port := corev1.ServicePort{ + dst, ok := portMappings[dnatPort] + if !ok { + klog.Errorf("failed to find proxy destination for port: %s", dnatPort) + continue + } + + _, targetPort, err := net.SplitHostPort(dst) + if err != nil { + klog.Errorf("failed to split target port, %v", err) + continue + } + targetPortInt, err := strconv.Atoi(targetPort) + if err != nil { + klog.Errorf("failed to parse target port, %v", err) + continue + } + + tcpPort := fmt.Sprintf("%s:%s", corev1.ProtocolTCP, dnatPort) + dnatPortMap[tcpPort] = true + + p, ok := svcPortMap[tcpPort] + // new port or has not tcp protocol port, add a new port for service + if !ok { + svcPortMap[tcpPort] = corev1.ServicePort{ Name: fmt.Sprintf("%v%v", dnatPortPrefix, dnatPort), Port: int32(portInt), Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt(dnsctl.insecurePort), + TargetPort: intstr.FromInt(targetPortInt), + } + changed = true + } else if p.TargetPort.String() != targetPort { // target port is changed, overwrite the old port in service + svcPortMap[tcpPort] = corev1.ServicePort{ + Name: p.Name, + Port: p.Port, + Protocol: p.Protocol, + TargetPort: intstr.FromInt(targetPortInt), } - svc.Spec.Ports = append(svc.Spec.Ports, port) changed = true } } updatedSvcPorts := make([]corev1.ServicePort, 0, len(svc.Spec.Ports)) - for i := range svc.Spec.Ports { - port := svc.Spec.Ports[i] - if strings.HasPrefix(port.Name, dnatPortPrefix) && !dnatPortMap[int(port.Port)] { + for tcpPort, svcPort := range svcPortMap { + if strings.HasPrefix(tcpPort, string(corev1.ProtocolTCP)) && + strings.HasPrefix(svcPort.Name, dnatPortPrefix) && + !dnatPortMap[tcpPort] { changed = true continue } - updatedSvcPorts = append(updatedSvcPorts, port) - } - - if !changed { - return nil + updatedSvcPorts = append(updatedSvcPorts, svcPort) } - svc.Spec.Ports = updatedSvcPorts - _, err = dnsctl.kubeClient.CoreV1().Services(constants.YurttunnelServerServiceNs).Update(context.Background(), svc, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("failed to sync tunnel server service, %v", err) - } - return nil + return changed, updatedSvcPorts } diff --git a/pkg/yurttunnel/dns/dns_test.go b/pkg/yurttunnel/dns/dns_test.go new file mode 100644 index 00000000000..95de5ee1133 --- /dev/null +++ b/pkg/yurttunnel/dns/dns_test.go @@ -0,0 +1,330 @@ +package dns + +import ( + "fmt" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func TestResolveServicePorts(t *testing.T) { + testcases := map[string]struct { + service *corev1.Service + currentPorts []string + currentPortMappings map[string]string + expectResult struct { + changed bool + svcPorts map[string]int + } + }{ + "add a new port": { + service: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "x-tunnel-server-internal-svc", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: "TCP", + Port: 10255, + TargetPort: intstr.FromString("10264"), + }, + { + Name: "https", + Protocol: "TCP", + Port: 10250, + TargetPort: intstr.FromString("10263"), + }, + }, + }, + }, + currentPorts: []string{"9510"}, + currentPortMappings: map[string]string{"9510": "1.1.1.1:10264"}, + expectResult: struct { + changed bool + svcPorts map[string]int + }{ + changed: true, + svcPorts: map[string]int{ + "http:TCP:10255:10264": 1, + "https:TCP:10250:10263": 1, + "dnat-9510:TCP:9510:10264": 1, + }}, + }, + "add port when udp protocol port exists": { + service: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "x-tunnel-server-internal-svc", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: "TCP", + Port: 10255, + TargetPort: intstr.FromString("10264"), + }, + { + Name: "https", + Protocol: "TCP", + Port: 10250, + TargetPort: intstr.FromString("10263"), + }, + { + Name: "test-udp", + Protocol: "UDP", + Port: 9510, + TargetPort: intstr.FromString("10264"), + }, + }, + }, + }, + currentPorts: []string{"9510"}, + currentPortMappings: map[string]string{"9510": "1.1.1.1:10264"}, + expectResult: struct { + changed bool + svcPorts map[string]int + }{ + changed: true, + svcPorts: map[string]int{ + "http:TCP:10255:10264": 1, + "https:TCP:10250:10263": 1, + "test-udp:UDP:9510:10264": 1, + "dnat-9510:TCP:9510:10264": 1, + }}, + }, + "update port with different target port": { + service: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "x-tunnel-server-internal-svc", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: "TCP", + Port: 10255, + TargetPort: intstr.FromString("10264"), + }, + { + Name: "https", + Protocol: "TCP", + Port: 10250, + TargetPort: intstr.FromString("10263"), + }, + { + Name: "dnat-9510", + Protocol: "TCP", + Port: 9510, + TargetPort: intstr.FromString("10264"), + }, + }, + }, + }, + currentPorts: []string{"9510"}, + currentPortMappings: map[string]string{"9510": "1.1.1.1:10263"}, + expectResult: struct { + changed bool + svcPorts map[string]int + }{ + changed: true, + svcPorts: map[string]int{ + "http:TCP:10255:10264": 1, + "https:TCP:10250:10263": 1, + "dnat-9510:TCP:9510:10263": 1, + }}, + }, + "add a new port when beyond default port exists": { + service: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "x-tunnel-server-internal-svc", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: "TCP", + Port: 10255, + TargetPort: intstr.FromString("10264"), + }, + { + Name: "https", + Protocol: "TCP", + Port: 10250, + TargetPort: intstr.FromString("10263"), + }, + { + Name: "dnat-9510", + Protocol: "TCP", + Port: 9510, + TargetPort: intstr.FromString("10264"), + }, + }, + }, + }, + currentPorts: []string{"9510", "9511"}, + currentPortMappings: map[string]string{"9510": "1.1.1.1:10264", "9511": "1.1.1.1:10263"}, + expectResult: struct { + changed bool + svcPorts map[string]int + }{ + changed: true, + svcPorts: map[string]int{ + "http:TCP:10255:10264": 1, + "https:TCP:10250:10263": 1, + "dnat-9510:TCP:9510:10264": 1, + "dnat-9511:TCP:9511:10263": 1, + }, + }, + }, + "add a new port meanwhile delete an old port": { + service: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "x-tunnel-server-internal-svc", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: "TCP", + Port: 10255, + TargetPort: intstr.FromString("10264"), + }, + { + Name: "https", + Protocol: "TCP", + Port: 10250, + TargetPort: intstr.FromString("10263"), + }, + { + Name: "dnat-9510", + Protocol: "TCP", + Port: 9510, + TargetPort: intstr.FromString("10264"), + }, + }, + }, + }, + currentPorts: []string{"9511"}, + currentPortMappings: map[string]string{"9511": "1.1.1.1:10263"}, + expectResult: struct { + changed bool + svcPorts map[string]int + }{ + changed: true, + svcPorts: map[string]int{ + "http:TCP:10255:10264": 1, + "https:TCP:10250:10263": 1, + "dnat-9511:TCP:9511:10263": 1, + }, + }, + }, + "service ports have not changed": { + service: &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "x-tunnel-server-internal-svc", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: "TCP", + Port: 10255, + TargetPort: intstr.FromString("10264"), + }, + { + Name: "https", + Protocol: "TCP", + Port: 10250, + TargetPort: intstr.FromString("10263"), + }, + { + Name: "dnat-9510", + Protocol: "TCP", + Port: 9510, + TargetPort: intstr.FromString("10264"), + }, + }, + }, + }, + currentPorts: []string{"9510"}, + currentPortMappings: map[string]string{"9510": "1.1.1.1:10264"}, + expectResult: struct { + changed bool + svcPorts map[string]int + }{ + changed: false, + svcPorts: map[string]int{ + "http:TCP:10255:10264": 1, + "https:TCP:10250:10263": 1, + "dnat-9510:TCP:9510:10264": 1, + }, + }, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + changed, svcPorts := resolveServicePorts(tt.service, tt.currentPorts, tt.currentPortMappings) + if tt.expectResult.changed != changed { + t.Errorf("expect changed: %v, but got changed: %v", tt.expectResult.changed, changed) + } + + portsMap := make(map[string]int) + for _, svcPort := range svcPorts { + key := fmt.Sprintf("%s:%s:%d:%s", svcPort.Name, svcPort.Protocol, svcPort.Port, svcPort.TargetPort.String()) + if cnt, ok := portsMap[key]; ok { + portsMap[key] = cnt + 1 + } else { + portsMap[key] = 1 + } + } + + // check the servicePorts + if len(tt.expectResult.svcPorts) != len(portsMap) { + t.Errorf("expect %d service ports, but got %d service ports", len(tt.expectResult.svcPorts), len(portsMap)) + } + + for k, v := range tt.expectResult.svcPorts { + if gotV, ok := portsMap[k]; !ok { + t.Errorf("expect key %s, but not got", k) + } else if v != gotV { + t.Errorf("key(%s): expect value %d, but got value %d", k, v, gotV) + } + } + }) + } +} diff --git a/pkg/yurttunnel/iptables/iptables.go b/pkg/yurttunnel/iptables/iptables.go index edf396de3f8..64c47eab26c 100644 --- a/pkg/yurttunnel/iptables/iptables.go +++ b/pkg/yurttunnel/iptables/iptables.go @@ -39,8 +39,6 @@ import ( ) const ( - kubeletSecurePort = "10250" - kubeletInsecurePort = "10255" loopbackAddr = "127.0.0.1" reqReturnComment = "return request to access node directly" dnatToTunnelComment = "dnat to tunnel for access node" @@ -85,7 +83,6 @@ type iptablesManager struct { conntrackPath string secureDnatDest string insecureDnatDest string - insecurePort string lastNodesIP []string lastDnatPorts []string syncPeriod int @@ -108,11 +105,6 @@ func NewIptablesManager(client clientset.Interface, syncPeriod = defaultSyncPeriod } - _, insecurePort, err := net.SplitHostPort(listenInsecureAddr) - if err != nil { - return nil - } - im := &iptablesManager{ kubeClient: client, iptables: iptInterface, @@ -120,7 +112,6 @@ func NewIptablesManager(client clientset.Interface, nodeInformer: nodeInformer, secureDnatDest: listenAddr, insecureDnatDest: listenInsecureAddr, - insecurePort: insecurePort, lastNodesIP: make([]string, 0), lastDnatPorts: make([]string, 0), syncPeriod: syncPeriod, @@ -172,7 +163,7 @@ func (im *iptablesManager) cleanupIptableSetting() { for _, port := range im.lastDnatPorts { deletedJumpChains = append(deletedJumpChains, iptables.Chain(fmt.Sprintf("%s%s", yurttunnelPortChainPrefix, port))) } - for _, port := range []string{kubeletSecurePort, kubeletInsecurePort} { + for _, port := range []string{util.KubeletHTTPSPort, util.KubeletHTTPPort} { deletedJumpChains = append(deletedJumpChains, iptables.Chain(fmt.Sprintf("%s%s", yurttunnelPortChainPrefix, port))) } @@ -287,7 +278,7 @@ func getNodeInternalIPs(node *corev1.Node) []string { // ensurePortsIptables ensures jump chains and rules for active dnat ports, and // delete the jump chains if their corresponding dnat ports are removed -func (im *iptablesManager) ensurePortsIptables(currentPorts, deletedPorts, currentIPs, deletedIPs []string) error { +func (im *iptablesManager) ensurePortsIptables(currentPorts, deletedPorts, currentIPs, deletedIPs []string, portMappings map[string]string) error { // for each dnat port, we create a jump chain jumpChains := iptablesJumpChains for _, port := range currentPorts { @@ -306,7 +297,7 @@ func (im *iptablesManager) ensurePortsIptables(currentPorts, deletedPorts, curre // ensure iptable rule for each dnat port for _, port := range currentPorts { - err := im.ensurePortIptables(port, currentIPs, deletedIPs) + err := im.ensurePortIptables(port, currentIPs, deletedIPs, portMappings) if err != nil { return err } @@ -335,7 +326,7 @@ func (im *iptablesManager) ensurePortsIptables(currentPorts, deletedPorts, curre return nil } -func (im *iptablesManager) ensurePortIptables(port string, currentIPs, deletedIPs []string) error { +func (im *iptablesManager) ensurePortIptables(port string, currentIPs, deletedIPs []string, portMappings map[string]string) error { portChain := iptables.Chain(fmt.Sprintf("%s%s", yurttunnelPortChainPrefix, port)) if len(currentIPs) == 0 { @@ -351,11 +342,15 @@ func (im *iptablesManager) ensurePortIptables(port string, currentIPs, deletedIP // decide the proxy destination based on the port number proxyDest := im.insecureDnatDest - if port == kubeletSecurePort { + if port == util.KubeletHTTPSPort { proxyDest = im.secureDnatDest + } else if port == util.KubeletHTTPPort { + proxyDest = im.insecureDnatDest + } else if dst, ok := portMappings[port]; ok { + proxyDest = dst } - // do not proxy packets, whose destination node doesn't has agent running + // do not proxy packets, those destination node doesn't has agent running for _, ip := range currentIPs { reqReturnPortIptablesArgs := reqReturnIptablesArgs(reqReturnComment, port, ip) _, err := im.iptables.EnsureRule( @@ -495,21 +490,21 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string { // while the request to access the cloud node is returned func (im *iptablesManager) syncIptableSetting() { // check if there are new dnat ports - dnatPorts, err := util.GetConfiguredDnatPorts(im.kubeClient, im.insecurePort) + dnatPorts, portMappings, err := util.GetConfiguredProxyPortsAndMappings(im.kubeClient, im.insecureDnatDest, im.secureDnatDest) if err != nil { klog.Errorf("failed to sync iptables rules, %v", err) return } portsChanged, deletedDnatPorts := im.getDeletedPorts(dnatPorts) - currentDnatPorts := append(dnatPorts, kubeletSecurePort, kubeletInsecurePort) + currentDnatPorts := append(dnatPorts, util.KubeletHTTPSPort, util.KubeletHTTPPort) // check if there are new nodes nodesIP := im.getIPOfNodesWithoutAgent() nodesChanged, addedNodesIP, deletedNodesIP := im.getAddedAndDeletedNodes(nodesIP) currentNodesIP := append(nodesIP, loopbackAddr) - // update the iptable setting if necessary - err = im.ensurePortsIptables(currentDnatPorts, deletedDnatPorts, currentNodesIP, deletedNodesIP) + // update the iptables setting if necessary + err = im.ensurePortsIptables(currentDnatPorts, deletedDnatPorts, currentNodesIP, deletedNodesIP, portMappings) if err != nil { klog.Errorf("failed to ensurePortsIptables: %v", err) return diff --git a/pkg/yurttunnel/iptables/iptables_test.go b/pkg/yurttunnel/iptables/iptables_test.go index 6363cd597f7..d83b5f9ae94 100644 --- a/pkg/yurttunnel/iptables/iptables_test.go +++ b/pkg/yurttunnel/iptables/iptables_test.go @@ -39,11 +39,6 @@ func newFakeIptablesManager(client clientset.Interface, syncPeriod = defaultSyncPeriod } - _, insecurePort, err := net.SplitHostPort(listenInsecureAddr) - if err != nil { - return nil - } - im := &iptablesManager{ kubeClient: client, iptables: iptInterface, @@ -51,7 +46,6 @@ func newFakeIptablesManager(client clientset.Interface, nodeInformer: nodeInformer, secureDnatDest: listenAddr, insecureDnatDest: listenInsecureAddr, - insecurePort: insecurePort, lastNodesIP: make([]string, 0), lastDnatPorts: make([]string, 0), syncPeriod: syncPeriod, diff --git a/pkg/yurttunnel/util/util.go b/pkg/yurttunnel/util/util.go index 17f38ad1fa9..f4d8e8a0c98 100644 --- a/pkg/yurttunnel/util/util.go +++ b/pkg/yurttunnel/util/util.go @@ -3,24 +3,32 @@ package util import ( "context" "fmt" + "net" "net/http" + "strconv" "strings" - "github.com/gorilla/mux" - "github.com/prometheus/client_golang/prometheus/promhttp" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "github.com/gorilla/mux" "github.com/openyurtio/openyurt/pkg/profile" "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/prometheus/client_golang/prometheus/promhttp" ) const ( // constants related dnat rules configmap YurttunnelServerDnatConfigMapNs = "kube-system" yurttunnelServerDnatDataKey = "dnat-ports-pair" + yurttunnelServerHTTPProxyPorts = "http-proxy-ports" + yurttunnelServerHTTPSProxyPorts = "https-proxy-ports" + + KubeletHTTPSPort = "10250" + KubeletHTTPPort = "10255" ) var ( @@ -52,41 +60,112 @@ func RunMetaServer(addr string) { }() } -// GetConfiguredDnatPorts returns the DNAT ports configured for tunnel server. -// NOTE: We only allow user to add dnat rule that uses insecure port as the destination port currently. -func GetConfiguredDnatPorts(client clientset.Interface, insecurePort string) ([]string, error) { - ports := make([]string, 0) +// GetConfiguredProxyPortsAndMappings returns the proxy ports and mappings that configured for tunnel server. +// field dnat-ports-pair will be deprecated in future version. it's recommended to use +// field http-proxy-ports and https-proxy-ports. +func GetConfiguredProxyPortsAndMappings(client clientset.Interface, insecureListenAddr, secureListenAddr string) ([]string, map[string]string, error) { c, err := client.CoreV1(). ConfigMaps(YurttunnelServerDnatConfigMapNs). Get(context.Background(), YurttunnelServerDnatConfigMapName, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { - return nil, fmt.Errorf("configmap %s/%s is not found", + return []string{}, map[string]string{}, fmt.Errorf("configmap %s/%s is not found", YurttunnelServerDnatConfigMapNs, YurttunnelServerDnatConfigMapName) } else { - return nil, fmt.Errorf("fail to get configmap %s/%s: %v", + return []string{}, map[string]string{}, fmt.Errorf("fail to get configmap %s/%s: %v", YurttunnelServerDnatConfigMapNs, YurttunnelServerDnatConfigMapName, err) } } - pairStr, ok := c.Data[yurttunnelServerDnatDataKey] - if !ok || len(pairStr) == 0 { - return ports, nil + return resolveProxyPortsAndMappings(c, insecureListenAddr, secureListenAddr) +} + +// resolveProxyPortsAndMappings get proxy ports and port mappings from specified configmap +func resolveProxyPortsAndMappings(cm *v1.ConfigMap, insecureListenAddr, secureListenAddr string) ([]string, map[string]string, error) { + var insecurePort string + var securePort string + var err error + portMappings := make(map[string]string, 0) + proxyPorts := make([]string, 0) + + _, insecurePort, err = net.SplitHostPort(insecureListenAddr) + _, securePort, err = net.SplitHostPort(secureListenAddr) + if err != nil { + return proxyPorts, portMappings, err } - portsPair := strings.Split(pairStr, ",") - for _, pair := range portsPair { - portPair := strings.Split(pair, "=") - if len(portPair) == 2 && - portPair[1] == insecurePort && - len(portPair[0]) != 0 { - if portPair[0] != "10250" && portPair[0] != "10255" { - ports = append(ports, portPair[0]) + // field dnat-ports-pair will be deprecated in future version + pairStr, ok := cm.Data[yurttunnelServerDnatDataKey] + if ok && len(strings.TrimSpace(pairStr)) != 0 { + portsPair := strings.Split(pairStr, ",") + for _, pair := range portsPair { + portParts := strings.Split(pair, "=") + if len(portParts) == 2 { + proxyPort := strings.TrimSpace(portParts[0]) + if len(proxyPort) != 0 && strings.TrimSpace(portParts[1]) == insecurePort { + portMappings[proxyPort] = insecureListenAddr + } else if len(proxyPort) != 0 && strings.TrimSpace(portParts[1]) == securePort { + portMappings[proxyPort] = secureListenAddr + } } } } - return ports, nil + // resolve http-proxy-port field + httpProxyPorts, ok := cm.Data[yurttunnelServerHTTPProxyPorts] + if ok && len(strings.TrimSpace(httpProxyPorts)) != 0 { + ports := strings.Split(httpProxyPorts, ",") + for _, port := range ports { + proxyPort := strings.TrimSpace(port) + if len(proxyPort) != 0 { + portInt, err := strconv.Atoi(proxyPort) + if err != nil { + klog.Errorf("failed to parse http port %s, %v", port, err) + continue + } else if portInt < 1 || portInt > 65535 { + klog.Errorf("port %s is not invalid http port", port) + continue + } + + portMappings[proxyPort] = insecureListenAddr + } + } + } + + // resolve https-proxy-port field + httpsProxyPorts, ok := cm.Data[yurttunnelServerHTTPSProxyPorts] + if ok && len(strings.TrimSpace(httpsProxyPorts)) != 0 { + ports := strings.Split(httpsProxyPorts, ",") + for _, port := range ports { + proxyPort := strings.TrimSpace(port) + if len(proxyPort) != 0 { + portInt, err := strconv.Atoi(proxyPort) + if err != nil { + klog.Errorf("failed to parse https port %s, %v", port, err) + continue + } else if portInt < 1 || portInt > 65535 { + klog.Errorf("port %s is not invalid https port", port) + continue + } + + portMappings[proxyPort] = secureListenAddr + } + } + } + + if _, ok := portMappings[KubeletHTTPSPort]; ok { + delete(portMappings, KubeletHTTPSPort) + } + + if _, ok := portMappings[KubeletHTTPPort]; ok { + delete(portMappings, KubeletHTTPPort) + } + + for port := range portMappings { + proxyPorts = append(proxyPorts, port) + } + + return proxyPorts, portMappings, nil } diff --git a/pkg/yurttunnel/util/util_test.go b/pkg/yurttunnel/util/util_test.go new file mode 100644 index 00000000000..a98a960284c --- /dev/null +++ b/pkg/yurttunnel/util/util_test.go @@ -0,0 +1,195 @@ +package util + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + insecureListenAddr = "1.1.1.1:10264" + secureListenAddr = "1.1.1.1:10263" +) + +func TestResolveProxyPortsAndMappings(t *testing.T) { + testcases := map[string]struct { + configMap *corev1.ConfigMap + expectResult struct { + ports []string + portMappings map[string]string + err error + } + }{ + "setting ports on dnat-ports-pair": { + configMap: &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-tunnel-server-cfg", + Namespace: "kube-system", + }, + Data: map[string]string{ + "dnat-ports-pair": "9100=10264", + }, + }, + expectResult: struct { + ports []string + portMappings map[string]string + err error + }{ + ports: []string{"9100"}, + portMappings: map[string]string{ + "9100": insecureListenAddr, + }, + err: nil, + }, + }, + "setting ports on http-proxy-ports": { + configMap: &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-tunnel-server-cfg", + Namespace: "kube-system", + }, + Data: map[string]string{ + "http-proxy-ports": "9100,9200", + }, + }, + expectResult: struct { + ports []string + portMappings map[string]string + err error + }{ + ports: []string{"9100", "9200"}, + portMappings: map[string]string{ + "9100": insecureListenAddr, + "9200": insecureListenAddr, + }, + err: nil, + }, + }, + "setting ports on https-proxy-ports": { + configMap: &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-tunnel-server-cfg", + Namespace: "kube-system", + }, + Data: map[string]string{ + "https-proxy-ports": "9100,9200", + }, + }, + expectResult: struct { + ports []string + portMappings map[string]string + err error + }{ + ports: []string{"9100", "9200"}, + portMappings: map[string]string{ + "9100": secureListenAddr, + "9200": secureListenAddr, + }, + err: nil, + }, + }, + "setting ports on http-proxy-ports and https-proxy-ports": { + configMap: &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "yurt-tunnel-server-cfg", + Namespace: "kube-system", + }, + Data: map[string]string{ + "http-proxy-ports": "9100,9200", + "https-proxy-ports": "9300,9400", + }, + }, + expectResult: struct { + ports []string + portMappings map[string]string + err error + }{ + ports: []string{"9100", "9200", "9300", "9400"}, + portMappings: map[string]string{ + "9100": insecureListenAddr, + "9200": insecureListenAddr, + "9300": secureListenAddr, + "9400": secureListenAddr, + }, + err: nil, + }, + }, + } + + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + ports, portMappings, err := resolveProxyPortsAndMappings(tt.configMap, insecureListenAddr, secureListenAddr) + if tt.expectResult.err != err { + t.Errorf("expect error: %v, but got error: %v", tt.expectResult.err, err) + } + + // check the ports + if len(tt.expectResult.ports) != len(ports) { + t.Errorf("expect %d ports, but got %d ports", len(tt.expectResult.ports), len(ports)) + } + + foundPort := false + for i := range tt.expectResult.ports { + foundPort = false + for j := range ports { + if tt.expectResult.ports[i] == ports[j] { + foundPort = true + break + } + } + + if !foundPort { + t.Errorf("expect %v ports, but got ports %v", tt.expectResult.ports, ports) + break + } + } + + foundPort = false + for i := range ports { + foundPort = false + for j := range tt.expectResult.ports { + if tt.expectResult.ports[j] == ports[i] { + foundPort = true + break + } + } + + if !foundPort { + t.Errorf("expect %v ports, but got ports %v", tt.expectResult.ports, ports) + break + } + } + + // check the portMappings + if len(tt.expectResult.portMappings) != len(portMappings) { + t.Errorf("expect port mappings %v, but got port mappings %v", tt.expectResult.portMappings, portMappings) + } + + for port, v := range tt.expectResult.portMappings { + if gotV, ok := portMappings[port]; !ok { + t.Errorf("expect port %s, but not got port", k) + } else if v != gotV { + t.Errorf("port(%s): expect dst value %s, but got dst value %s", k, v, gotV) + } + delete(portMappings, port) + } + }) + } +}