From b7507bcdae3e62c6bcaaf182f7d42a718d01cf87 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Wed, 13 Dec 2023 20:40:02 +0800 Subject: [PATCH] Support Local ExternalTrafficPolicy for Services with ExternalIPs Since K8s 1.29, setting Local ExternalTrafficPolicy for ClusterIP Services with ExternalIPs is supported. Signed-off-by: Quan Tian --- pkg/agent/proxy/proxier_test.go | 49 ++++++++++++++++++++++++++----- third_party/proxy/util/service.go | 9 ++++-- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 94c8bb00575..81065124f87 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -155,6 +155,7 @@ func makeTestClusterIPService(svcPortName *k8sproxy.ServicePortName, nested bool, labels map[string]string) *corev1.Service { return makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.Type = corev1.ServiceTypeClusterIP svc.Spec.ClusterIP = clusterIP.String() svc.Spec.Ports = []corev1.ServicePort{{ Name: svcPortName.Port, @@ -2676,6 +2677,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, loadBalancerIP net.IP, + externalIP net.IP, ep1IP net.IP, ep2IP net.IP, svcType corev1.ServiceType, @@ -2687,12 +2689,17 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, var svc, updatedSvc *corev1.Service switch svcType { + case corev1.ServiceTypeClusterIP: + // ExternalTrafficPolicy defaults to Cluster. + svc = makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + updatedSvc = svc.DeepCopy() + updatedSvc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal case corev1.ServiceTypeNodePort: - svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) + svc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeLocal) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeLocal) } makeServiceMap(fp, svc) @@ -2723,6 +2730,14 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, Protocol: bindingProtocol, ClusterGroupID: 1, }).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: externalIP, + ServicePort: uint16(svcPort), + Protocol: bindingProtocol, + ClusterGroupID: 1, + IsExternal: true, + }).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2754,6 +2769,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, expectedLocalEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort), @@ -2761,6 +2777,17 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, LocalGroupID: 2, ClusterGroupID: 1, }).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: externalIP, + ServicePort: uint16(svcPort), + Protocol: bindingProtocol, + LocalGroupID: 2, + ClusterGroupID: 1, + TrafficPolicyLocal: true, + IsExternal: true, + }).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) @@ -2802,19 +2829,25 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, func TestServiceExternalTrafficPolicyUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { + t.Run("ClusterIP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, nil, svc1IPv4, nil, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeClusterIP, false) + }) t.Run("NodePort", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, ep1IPv4, ep2IPv4, corev1.ServiceTypeNodePort, false) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeNodePort, false) }) t.Run("LoadBalancer", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeLoadBalancer, false) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeLoadBalancer, false) }) }) t.Run("IPv6", func(t *testing.T) { + t.Run("ClusterIP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, nil, svc1IPv6, nil, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeClusterIP, true) + }) t.Run("NodePort", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, ep1IPv6, ep2IPv6, corev1.ServiceTypeNodePort, true) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeNodePort, true) }) t.Run("LoadBalancer", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeLoadBalancer, true) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeLoadBalancer, true) }) }) } diff --git a/third_party/proxy/util/service.go b/third_party/proxy/util/service.go index 883c373d47f..9056f7b4626 100644 --- a/third_party/proxy/util/service.go +++ b/third_party/proxy/util/service.go @@ -37,10 +37,15 @@ package util import v1 "k8s.io/api/core/v1" +func ExternallyAccessible(service *v1.Service) bool { + return service.Spec.Type == v1.ServiceTypeLoadBalancer || + service.Spec.Type == v1.ServiceTypeNodePort || + (service.Spec.Type == v1.ServiceTypeClusterIP && len(service.Spec.ExternalIPs) > 0) +} + // ExternalPolicyLocal checks if service has ETP = Local. func ExternalPolicyLocal(service *v1.Service) bool { - if service.Spec.Type != v1.ServiceTypeLoadBalancer && - service.Spec.Type != v1.ServiceTypeNodePort { + if !ExternallyAccessible(service) { return false } return service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal