Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AntreaProxy not deleting stale UDP conntrack entries for the virtual NodePort DNAT IP #6379

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ func (p *proxier) removeStaleServiceConntrackEntries(svcPortName k8sproxy.Servic
svcPort := uint16(svcInfo.Port())
nodePort := uint16(svcInfo.NodePort())
svcProto := svcInfo.OFProtocol
virtualNodePortDNATIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
virtualNodePortDNATIP = agentconfig.VirtualNodePortDNATIPv6
}

svcIPToPort := make(map[string]uint16)
svcIPToPort[svcInfo.ClusterIP().String()] = svcPort
Expand All @@ -354,6 +358,7 @@ func (p *proxier) removeStaleServiceConntrackEntries(svcPortName k8sproxy.Servic
for _, nodeIP := range p.nodePortAddresses {
svcIPToPort[nodeIP.String()] = nodePort
}
svcIPToPort[virtualNodePortDNATIP.String()] = nodePort
}

for svcIPStr, port := range svcIPToPort {
Expand All @@ -372,17 +377,23 @@ func (p *proxier) removeStaleConntrackEntries(svcPortName k8sproxy.ServicePortNa
svcPort := uint16(svcInfo.Port())
pNodePort := uint16(pSvcInfo.NodePort())
nodePort := uint16(svcInfo.NodePort())
pClusterIP := pSvcInfo.ClusterIP().String()
clusterIP := svcInfo.ClusterIP().String()
pExternalIPStrings := pSvcInfo.ExternalIPStrings()
externalIPStrings := svcInfo.ExternalIPStrings()
pLoadBalancerIPStrings := pSvcInfo.LoadBalancerIPStrings()
loadBalancerIPStrings := svcInfo.LoadBalancerIPStrings()
virtualNodePortDNATIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
virtualNodePortDNATIP = agentconfig.VirtualNodePortDNATIPv6
}
var svcPortChanged, svcNodePortChanged bool

staleSvcIPToPort := make(map[string]uint16)
// If the port of the Service is changed, delete all conntrack entries related to the previous Service IPs and the
// previous Service port. These previous Service IPs includes external IPs, loadBalancer IPs and the ClusterIP.
if pSvcPort != svcPort {
staleSvcIPToPort[pSvcInfo.ClusterIP().String()] = pSvcPort
staleSvcIPToPort[pClusterIP] = pSvcPort
for _, ip := range pExternalIPStrings {
staleSvcIPToPort[ip] = pSvcPort
}
Expand All @@ -394,7 +405,10 @@ func (p *proxier) removeStaleConntrackEntries(svcPortName k8sproxy.ServicePortNa
svcPortChanged = true
} else {
// If the port of the Service is not changed, delete the conntrack entries related to the stale Service IPs and
// the Service port. These stale Service IPs could be from external IPs and loadBalancer IPs.
// the Service port. These stale Service IPs could be clusterIP, externalIPs or loadBalancerIPs.
if pClusterIP != clusterIP {
tnqn marked this conversation as resolved.
Show resolved Hide resolved
staleSvcIPToPort[pClusterIP] = pSvcPort
}
deletedExternalIPs := smallSliceDifference(pExternalIPStrings, externalIPStrings)
deletedLoadBalancerIPs := smallSliceDifference(pLoadBalancerIPStrings, loadBalancerIPStrings)
for _, ip := range deletedExternalIPs {
Expand All @@ -404,11 +418,13 @@ func (p *proxier) removeStaleConntrackEntries(svcPortName k8sproxy.ServicePortNa
staleSvcIPToPort[ip] = pSvcPort
}
}
// If the NodePort of the Service is changed, delete the contrack entries related to the Node IPs and the Service nodePort.
// If the NodePort of the Service is changed, delete the conntrack entries related to each of the Node IPs / the
// virtual IP to which NodePort traffic from external will be DNATed and the Service nodePort.
if pNodePort != nodePort {
for _, nodeIP := range p.nodePortAddresses {
staleSvcIPToPort[nodeIP.String()] = pNodePort
}
staleSvcIPToPort[virtualNodePortDNATIP.String()] = pNodePort
svcNodePortChanged = true
}
// Delete the conntrack entries due to the change of the Service.
Expand All @@ -423,7 +439,7 @@ func (p *proxier) removeStaleConntrackEntries(svcPortName k8sproxy.ServicePortNa
remainingSvcIPToPort := make(map[string]uint16)
if !svcPortChanged {
// Get all remaining Service IPs.
remainingSvcIPToPort[svcInfo.ClusterIP().String()] = svcPort
remainingSvcIPToPort[clusterIP] = svcPort
for _, ip := range smallSliceSame(pExternalIPStrings, externalIPStrings) {
remainingSvcIPToPort[ip] = svcPort
}
Expand Down Expand Up @@ -744,6 +760,21 @@ func (p *proxier) installServices() {
pSvcInfo.ExternalPolicyLocal() != svcInfo.ExternalPolicyLocal() ||
pSvcInfo.InternalPolicyLocal() != svcInfo.InternalPolicyLocal()
if p.cleanupStaleUDPSvcConntrack && needClearConntrackEntries(pSvcInfo.OFProtocol) {
// We clean the UDP conntrack entries for the following Service update cases:
// - Service port changed, clean the conntrack entries matched by each of the current clusterIP / externalIPs
// / loadBalancerIPs and the stale Service port.
// - ClusterIP changed, clean the conntrack entries matched by the clusterIP and the Service port.
// - Some externalIPs / loadBalancerIPs are removed, clean the conntrack entries matched by each of the
// removed Service IPs and the current Service port.
// - Service nodePort changed, clean the conntrack entries matched by each of the Node IPs / the virtual
// NodePort DNAT IP and the stale Service nodePort.
// However, we DO NOT clean the UDP conntrack entries related to remote Endpoints that are still
// referenced by the Service but are no longer selectable Endpoints for the corresponding Service IPs
// (for externalTrafficPolicy, these IPs are loadBalancerIPs, externalIPs and NodeIPs; for
// internalTrafficPolicy, these IPs clusterIPs) when externalTrafficPolicy or internalTrafficPolicy is
// changed from Cluster to Local. Consequently, the connections, which are supposed to select local
// Endpoints, will continue to send packets to remote Endpoints due to the existing UDP conntrack entries
// until timeout.
needCleanupStaleUDPServiceConntrack = svcInfo.Port() != pSvcInfo.Port() ||
svcInfo.ClusterIP().String() != pSvcInfo.ClusterIP().String() ||
needUpdateServiceExternalAddresses
Expand All @@ -761,7 +792,8 @@ func (p *proxier) installServices() {
if len(staleEndpoints) > 0 || len(newEndpoints) > 0 {
needUpdateEndpoints = true
}
// If there are stale Endpoints for a UDP Service, conntrack connections of these stale Endpoints should be deleted.
// We also clean the conntrack entries related to the stale Endpoints for a UDP Service. Conntrack entries
// matched by each of stale Endpoint IPs and each of the remaining Service IPs and ports will be deleted.
if len(staleEndpoints) > 0 && needClearConntrackEntries(svcInfo.OFProtocol) {
needCleanupStaleUDPServiceConntrack = true
}
Expand Down
39 changes: 35 additions & 4 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,7 @@ func TestDualStackService(t *testing.T) {
fpv6 := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, true)

svc := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) {
svc.Spec.IPFamilyPolicy = ptr.To(corev1.IPFamilyPolicyPreferDualStack)
svc.Spec.ClusterIP = svc1IPv4.String()
svc.Spec.ClusterIPs = []string{svc1IPv4.String(), svc1IPv6.String()}
svc.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv4Protocol, corev1.IPv6Protocol}
Expand Down Expand Up @@ -1364,8 +1365,9 @@ func TestDualStackService(t *testing.T) {
fpv6.OnEndpointSliceUpdate(nil, epv6)
fpv6.OnEndpointsSynced()

mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IPv4.String(), "", "", svcPort, false, true, true, false, nil)}).Times(1)
mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1)
expectedIPv4Eps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IPv4.String(), "", "", svcPort, false, true, true, false, nil)}
mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, expectedIPv4Eps).Times(1)
mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, expectedIPv4Eps).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{
ServiceIP: svc1IPv4,
ServicePort: uint16(svcPort),
Expand All @@ -1375,8 +1377,9 @@ func TestDualStackService(t *testing.T) {
ClusterGroupID: 1,
}).Times(1)

mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IPv6.String(), "", "", svcPort, false, true, true, false, nil)}).Times(1)
mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCPv6, gomock.Any()).Times(1)
expectedIPv6Eps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IPv6.String(), "", "", svcPort, false, true, true, false, nil)}
mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, expectedIPv6Eps).Times(1)
mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCPv6, expectedIPv6Eps).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{
ServiceIP: svc1IPv6,
ServicePort: uint16(svcPort),
Expand All @@ -1390,6 +1393,32 @@ func TestDualStackService(t *testing.T) {
fpv6.syncProxyRules()
assert.Contains(t, fpv4.serviceInstalledMap, svcPortName)
assert.Contains(t, fpv6.serviceInstalledMap, svcPortName)

updatedSvc := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) {
svc.Spec.IPFamilyPolicy = ptr.To(corev1.IPFamilyPolicySingleStack)
svc.Spec.ClusterIP = svc1IPv4.String()
svc.Spec.ClusterIPs = []string{svc1IPv4.String()}
svc.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv4Protocol}
svc.Spec.Ports = []corev1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: corev1.ProtocolTCP,
}}
})
fpv4.OnServiceUpdate(svc, updatedSvc)
fpv4.OnServiceSynced()
fpv6.OnServiceUpdate(svc, updatedSvc)
fpv6.OnServiceSynced()

mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv6, uint16(svcPort), binding.ProtocolTCPv6).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(binding.GroupIDType(2)).Times(1)
mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCPv6, expectedIPv6Eps).Times(1)

fpv4.syncProxyRules()
fpv6.syncProxyRules()

assert.Contains(t, fpv4.serviceInstalledMap, svcPortName)
assert.NotContains(t, fpv6.serviceInstalledMap, svcPortName)
}

func getAPIProtocol(bindingProtocol binding.Protocol) corev1.Protocol {
Expand Down Expand Up @@ -1626,6 +1655,7 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b
if needClearConntrackEntries(bindingProtocol) {
mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(vIP, uint16(svcNodePort), nil, bindingProtocol)
if externalIP != nil {
mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol)
}
Expand Down Expand Up @@ -1764,6 +1794,7 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP
if needClearConntrackEntries(bindingProtocol) {
mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(vIP, uint16(svcNodePort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), nil, bindingProtocol)
if externalIP != nil {
mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol)
Expand Down
Loading