Skip to content

Commit

Permalink
Merge pull request #1995 from rambohe-ch/nodeport-isolation-dev
Browse files Browse the repository at this point in the history
return back watch.Deleted event to clients when watch object is removed in OjbectFilters
  • Loading branch information
Congrool authored Mar 27, 2024
2 parents 9769ab2 + 0773528 commit c589f8a
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 36 deletions.
9 changes: 4 additions & 5 deletions pkg/yurthub/filter/inclusterconfig/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,16 @@ func (iccf *inClusterConfigFilter) SupportedResourceAndVerbs() map[string]sets.S
func (iccf *inClusterConfigFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object {
switch v := obj.(type) {
case *v1.ConfigMap:
cm, _ := mutateKubeProxyConfigMap(v)
return cm
return mutateKubeProxyConfigMap(v)
default:
return v
}
}

func mutateKubeProxyConfigMap(cm *v1.ConfigMap) (*v1.ConfigMap, bool) {
mutated := false
func mutateKubeProxyConfigMap(cm *v1.ConfigMap) *v1.ConfigMap {
if cm.Namespace == KubeProxyConfigMapNamespace && cm.Name == KubeProxyConfigMapName {
if cm.Data != nil && len(cm.Data[KubeProxyDataKey]) != 0 {
mutated := false
parts := make([]string, 0)
for _, line := range strings.Split(cm.Data[KubeProxyDataKey], "\n") {
items := strings.Split(strings.Trim(line, " "), ":")
Expand All @@ -94,5 +93,5 @@ func mutateKubeProxyConfigMap(cm *v1.ConfigMap) (*v1.ConfigMap, bool) {
}
}

return cm, mutated
return cm
}
5 changes: 1 addition & 4 deletions pkg/yurthub/filter/masterservice/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,8 @@ func (msf *masterServiceFilter) Filter(obj runtime.Object, _ <-chan struct{}) ru
}
}

func (msf *masterServiceFilter) mutateMasterService(svc *v1.Service) bool {
mutated := false
func (msf *masterServiceFilter) mutateMasterService(svc *v1.Service) {
if svc.Namespace == MasterServiceNamespace && svc.Name == MasterServiceName {
mutated = true
svc.Spec.ClusterIP = msf.host
for j := range svc.Spec.Ports {
if svc.Spec.Ports[j].Name == MasterServicePortName {
Expand All @@ -102,5 +100,4 @@ func (msf *masterServiceFilter) mutateMasterService(svc *v1.Service) bool {
}
klog.Infof("mutate master service with ClusterIP:Port=%s:%d", msf.host, msf.port)
}
return mutated
}
15 changes: 7 additions & 8 deletions pkg/yurthub/filter/nodeportisolation/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,19 @@ func (nif *nodePortIsolationFilter) Filter(obj runtime.Object, stopCh <-chan str
}

func (nif *nodePortIsolationFilter) isolateNodePortService(svc *v1.Service) *v1.Service {
nodePoolName := nif.resolveNodePoolName()
// node is not located in NodePool, keep the NodePort service the same as native K8s
if len(nodePoolName) == 0 {
return svc
}

nsName := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name)
if svc.Spec.Type == v1.ServiceTypeNodePort || svc.Spec.Type == v1.ServiceTypeLoadBalancer {
if _, ok := svc.Annotations[ServiceAnnotationNodePortListen]; ok {
nodePoolName := nif.resolveNodePoolName()
// node is not located in NodePool, keep the NodePort service the same as native K8s
if len(nodePoolName) == 0 {
return svc
}

nodePoolConf := getNodePoolConfiguration(svc.Annotations[ServiceAnnotationNodePortListen])
if nodePoolConf.Len() != 0 && isNodePoolEnabled(nodePoolConf, nodePoolName) {
return svc
} else {
klog.V(2).Infof("service(%s) is disabled in nodePool(%s) by nodePortIsolationFilter", nsName, nodePoolName)
klog.V(2).Infof("service(%s/%s) is disabled in nodePool(%s) by nodePortIsolationFilter", svc.Namespace, svc.Name, nodePoolName)
return nil
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/yurthub/filter/responsefilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ func (frc *filterReadCloser) streamResponseFilter(rc io.ReadCloser, ch chan *byt
// BOOKMARK and ERROR response are unnecessary to filter
if !(watchType == watch.Bookmark || watchType == watch.Error) {
if newObj = frc.objectFilter.Filter(obj, frc.stopCh); yurtutil.IsNil(newObj) {
continue
// if an object is removed in the filter chain, it means that this object is not needed
// to return back to clients(like kube-proxy). but in order to update the client's local cache,
// it's a good idea to return a watch.Deleted event to clients and make clients to remove this object in local cache.
watchType = watch.Deleted
newObj = obj
}
}

Expand Down
72 changes: 54 additions & 18 deletions pkg/yurthub/filter/responsefilter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2066,18 +2066,19 @@ func TestResponseFilterForWatchRequest(t *testing.T) {
serializerManager := serializer.NewSerializerManager()

testcases := map[string]struct {
objectFilters []filter.ObjectFilter
group string
version string
resource string
userAgent string
verb string
path string
accept string
eventType watch.EventType
inputObj runtime.Object
names sets.String
expectedObj runtime.Object
objectFilters []filter.ObjectFilter
group string
version string
resource string
userAgent string
verb string
path string
accept string
eventType watch.EventType
inputObj runtime.Object
names sets.String
expectedObj runtime.Object
expectedEventType watch.EventType
}{
"verify discardcloudservice filter": {
objectFilters: []filter.ObjectFilter{discardCloudSvcFilter},
Expand All @@ -2102,8 +2103,25 @@ func TestResponseFilterForWatchRequest(t *testing.T) {
Type: corev1.ServiceTypeLoadBalancer,
},
},
names: sets.NewString("discardcloudservice"),
expectedObj: nil,
names: sets.NewString("discardcloudservice"),
expectedObj: &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "svc1",
Namespace: "default",
Annotations: map[string]string{
discardcloudservice.DiscardServiceAnnotation: "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.187",
Type: corev1.ServiceTypeLoadBalancer,
},
},
expectedEventType: watch.Deleted,
},
"verify discardcloudservice and nodeportisolation filter with nil response": {
objectFilters: []filter.ObjectFilter{discardCloudSvcFilter, nodePortIsolationFilter},
Expand All @@ -2128,8 +2146,25 @@ func TestResponseFilterForWatchRequest(t *testing.T) {
Type: corev1.ServiceTypeLoadBalancer,
},
},
names: sets.NewString("discardcloudservice", "nodeportisolation"),
expectedObj: nil,
names: sets.NewString("discardcloudservice", "nodeportisolation"),
expectedObj: &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "svc1",
Namespace: "default",
Annotations: map[string]string{
discardcloudservice.DiscardServiceAnnotation: "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.187",
Type: corev1.ServiceTypeLoadBalancer,
},
},
expectedEventType: watch.Deleted,
},
"verify discardcloudservice and nodeportisolation filter normally": {
objectFilters: []filter.ObjectFilter{discardCloudSvcFilter, nodePortIsolationFilter},
Expand Down Expand Up @@ -2166,6 +2201,7 @@ func TestResponseFilterForWatchRequest(t *testing.T) {
Type: corev1.ServiceTypeLoadBalancer,
},
},
expectedEventType: watch.Added,
},
}

Expand Down Expand Up @@ -2234,8 +2270,8 @@ func TestResponseFilterForWatchRequest(t *testing.T) {
continue
}

if eType != tc.eventType {
t.Errorf("expect event type %s, but got %s", tc.eventType, eType)
if eType != tc.expectedEventType {
t.Errorf("expect event type %s, but got %s", tc.expectedEventType, eType)
}

if !reflect.DeepEqual(tc.expectedObj, resObj) {
Expand Down

0 comments on commit c589f8a

Please sign in to comment.