Skip to content

Commit

Permalink
feature: support proxy https request from cloud to edge through yurt-…
Browse files Browse the repository at this point in the history
…tunnel

fixes openyurtio#414
  • Loading branch information
rambohe-ch committed Aug 31, 2021
1 parent 3b028c0 commit 842858e
Show file tree
Hide file tree
Showing 10 changed files with 710 additions and 83 deletions.
1 change: 1 addition & 0 deletions cmd/yurt-tunnel-server/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func Run(cfg *config.CompletedConfig, stopCh <-chan struct{}) error {
dnsController, err := dns.NewCoreDNSRecordController(cfg.Client,
cfg.SharedInformerFactory,
cfg.ListenInsecureAddrForMaster,
cfg.ListenAddrForMaster,
cfg.DNSSyncPeriod)
if err != nil {
return fmt.Errorf("fail to create a new dnsController, %v", err)
Expand Down
2 changes: 2 additions & 0 deletions config/setup/yurt-tunnel-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ metadata:
name: yurt-tunnel-server-cfg
namespace: kube-system
data:
http-proxy-ports: ""
https-proxy-ports: ""
dnat-ports-pair: ""
---
apiVersion: apps/v1
Expand Down
2 changes: 2 additions & 0 deletions config/yaml-template/yurt-tunnel-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ metadata:
name: __project_prefix__-tunnel-server-cfg
namespace: kube-system
data:
http-proxy-ports: ""
https-proxy-ports: ""
dnat-ports-pair: ""
---
apiVersion: apps/v1
Expand Down
2 changes: 2 additions & 0 deletions pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ metadata:
name: yurt-tunnel-server-cfg
namespace: kube-system
data:
http-proxy-ports: ""
https-proxy-ports: ""
dnat-ports-pair: ""
`
YurttunnelServerDeployment = `
Expand Down
103 changes: 65 additions & 38 deletions pkg/yurttunnel/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,22 @@ type coreDNSRecordController struct {
cmInformerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
tunnelServerIP string
insecurePort int
syncPeriod int
listenInsecureAddr string
listenSecureAddr string
}

// NewCoreDNSRecordController create a CoreDNSRecordController that synchronizes node dns records with CoreDNS configuration
func NewCoreDNSRecordController(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
listenInsecureAddr string,
listenSecureAddr string,
syncPeriod int) (DNSRecordController, error) {

_, insecurePortStr, err := net.SplitHostPort(listenInsecureAddr)
if err != nil {
return nil, err
}

insecurePort, err := strconv.Atoi(insecurePortStr)
if err != nil {
return nil, err
}

dnsctl := &coreDNSRecordController{
kubeClient: client,
syncPeriod: syncPeriod,
insecurePort: insecurePort,
listenInsecureAddr: listenInsecureAddr,
listenSecureAddr: listenSecureAddr,
sharedInformerFactor: informerFactory,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tunnel-dns"),
}
Expand Down Expand Up @@ -321,11 +313,11 @@ func (dnsctl *coreDNSRecordController) ensureCoreDNSRecordConfigMap() error {

func (dnsctl *coreDNSRecordController) syncTunnelServerServiceAsWhole() error {
klog.V(2).Info("sync tunnel server service as whole")
dnatPorts, err := util.GetConfiguredDnatPorts(dnsctl.kubeClient, strconv.Itoa(dnsctl.insecurePort))
dnatPorts, portMappings, err := util.GetConfiguredProxyPortsAndMappings(dnsctl.kubeClient, dnsctl.listenInsecureAddr, dnsctl.listenSecureAddr)
if err != nil {
return err
}
return dnsctl.updateTunnelServerSvcDnatPorts(dnatPorts)
return dnsctl.updateTunnelServerSvcDnatPorts(dnatPorts, portMappings)
}

func (dnsctl *coreDNSRecordController) syncDNSRecordAsWhole() {
Expand Down Expand Up @@ -403,60 +395,95 @@ func (dnsctl *coreDNSRecordController) updateDNSRecords(records []string) error
return nil
}

func (dnsctl *coreDNSRecordController) updateTunnelServerSvcDnatPorts(ports []string) error {
func (dnsctl *coreDNSRecordController) updateTunnelServerSvcDnatPorts(ports []string, portMappings map[string]string) error {
svc, err := dnsctl.kubeClient.CoreV1().Services(constants.YurttunnelServerServiceNs).
Get(context.Background(), constants.YurttunnelServerInternalServiceName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to sync tunnel server internal service, %v", err)
}

changed, updatedSvcPorts := resolveServicePorts(svc, ports, portMappings)
if !changed {
return nil
}

svc.Spec.Ports = updatedSvcPorts
_, err = dnsctl.kubeClient.CoreV1().Services(constants.YurttunnelServerServiceNs).Update(context.Background(), svc, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to sync tunnel server service, %v", err)
}
return nil
}

// resolveServicePorts get service ports from specified service and ports.
func resolveServicePorts(svc *corev1.Service, ports []string, portMappings map[string]string) (bool, []corev1.ServicePort) {
changed := false

svcPortMap := make(map[int32]corev1.ServicePort)
svcPortMap := make(map[string]corev1.ServicePort)
for i := range svc.Spec.Ports {
port := svc.Spec.Ports[i]
svcPortMap[port.Port] = port
svcPortMap[fmt.Sprintf("%s:%d", port.Protocol, port.Port)] = port
}

dnatPortMap := make(map[int]bool)
dnatPortMap := make(map[string]bool)
for _, dnatPort := range ports {
portInt, err := strconv.Atoi(dnatPort)
if err != nil {
klog.Errorf("failed to parse dnat port %q, %v", dnatPort, err)
continue
}
dnatPortMap[portInt] = true

if p, ok := svcPortMap[int32(portInt)]; !ok || p.Protocol != corev1.ProtocolTCP {
port := corev1.ServicePort{
dst, ok := portMappings[dnatPort]
if !ok {
klog.Errorf("failed to find proxy destination for port: %s", dnatPort)
continue
}

_, targetPort, err := net.SplitHostPort(dst)
if err != nil {
klog.Errorf("failed to split target port, %v", err)
continue
}
targetPortInt, err := strconv.Atoi(targetPort)
if err != nil {
klog.Errorf("failed to parse target port, %v", err)
continue
}

tcpPort := fmt.Sprintf("%s:%s", corev1.ProtocolTCP, dnatPort)
dnatPortMap[tcpPort] = true

p, ok := svcPortMap[tcpPort]
// new port or has not tcp protocol port, add a new port for service
if !ok {
svcPortMap[tcpPort] = corev1.ServicePort{
Name: fmt.Sprintf("%v%v", dnatPortPrefix, dnatPort),
Port: int32(portInt),
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(dnsctl.insecurePort),
TargetPort: intstr.FromInt(targetPortInt),
}
changed = true
} else if p.TargetPort.String() != targetPort { // target port is changed, overwrite the old port in service
svcPortMap[tcpPort] = corev1.ServicePort{
Name: p.Name,
Port: p.Port,
Protocol: p.Protocol,
TargetPort: intstr.FromInt(targetPortInt),
}
svc.Spec.Ports = append(svc.Spec.Ports, port)
changed = true
}
}

updatedSvcPorts := make([]corev1.ServicePort, 0, len(svc.Spec.Ports))
for i := range svc.Spec.Ports {
port := svc.Spec.Ports[i]
if strings.HasPrefix(port.Name, dnatPortPrefix) && !dnatPortMap[int(port.Port)] {
for tcpPort, svcPort := range svcPortMap {
if strings.HasPrefix(tcpPort, string(corev1.ProtocolTCP)) &&
strings.HasPrefix(svcPort.Name, dnatPortPrefix) &&
!dnatPortMap[tcpPort] {
changed = true
continue
}
updatedSvcPorts = append(updatedSvcPorts, port)
}

if !changed {
return nil
updatedSvcPorts = append(updatedSvcPorts, svcPort)
}

svc.Spec.Ports = updatedSvcPorts
_, err = dnsctl.kubeClient.CoreV1().Services(constants.YurttunnelServerServiceNs).Update(context.Background(), svc, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to sync tunnel server service, %v", err)
}
return nil
return changed, updatedSvcPorts
}
Loading

0 comments on commit 842858e

Please sign in to comment.