Skip to content

Commit

Permalink
improve response filter to handle list response object (openyurtio#1991)
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch authored and zyjhtangtang committed Apr 16, 2024
1 parent 2137560 commit 7bc0e01
Show file tree
Hide file tree
Showing 15 changed files with 3,255 additions and 2,292 deletions.
112 changes: 97 additions & 15 deletions pkg/yurthub/filter/base/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package base

import (
"errors"
"fmt"
"testing"
"time"

Expand All @@ -25,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/tools/cache"

"github.com/openyurtio/openyurt/pkg/apis"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
Expand All @@ -47,16 +50,23 @@ func (noh *nopObjectHandler) Filter(obj runtime.Object, stopCh <-chan struct{})
return obj
}

func registerLocalFilters(filters *Filters) {
filters.Register("servicetopology", func() (filter.ObjectFilter, error) {
return &nopObjectHandler{name: "servicetopology"}, nil
})
filters.Register("discardcloudservice", func() (filter.ObjectFilter, error) {
return &nopObjectHandler{name: "discardcloudservice"}, nil
})
filters.Register("masterservice", func() (filter.ObjectFilter, error) {
return &nopObjectHandler{name: "masterservice"}, nil
})
var (
nodesNameErr = errors.New("nodes name error")
)

type nopNodesErrHandler struct {
nopObjectHandler
err error
}

func NewNopNodesErrHandler() filter.ObjectFilter {
return &nopNodesErrHandler{
err: nodesNameErr,
}
}

func (nneh *nopNodesErrHandler) SetNodesGetterAndSynced(filter.NodesInPoolGetter, cache.InformerSynced, bool) error {
return nneh.err
}

type nopInitializer struct{}
Expand All @@ -65,34 +75,87 @@ func (nopInit *nopInitializer) Initialize(_ filter.ObjectFilter) error {
return nil
}

type errInitializer struct{}

func (errInit *errInitializer) Initialize(_ filter.ObjectFilter) error {
return fmt.Errorf("error initialize")
}

func TestNewFromFilters(t *testing.T) {
allFilters := []string{"masterservice", "discardcloudservice", "servicetopology"}
testcases := map[string]struct {
inputFilters []string
disabledFilters []string
initializer filter.Initializer
generatedFilters sets.String
expectedErr bool
}{
"disable master service filter": {
inputFilters: allFilters,
disabledFilters: []string{"masterservice"},
generatedFilters: sets.NewString(allFilters...).Delete("masterservice"),
},
"disable service topology filter": {
inputFilters: allFilters,
disabledFilters: []string{"servicetopology"},
generatedFilters: sets.NewString(allFilters...).Delete("servicetopology"),
},
"disable discard cloud service filter": {
inputFilters: allFilters,
disabledFilters: []string{"discardcloudservice"},
generatedFilters: sets.NewString(allFilters...).Delete("discardcloudservice"),
},
"disable all filters": {
inputFilters: allFilters,
disabledFilters: []string{"*"},
generatedFilters: sets.NewString(),
},
"register duplicated filters": {
inputFilters: append(allFilters, "servicetopology"),
disabledFilters: []string{},
generatedFilters: sets.NewString(allFilters...),
},
"a invalid filter": {
inputFilters: append(allFilters, "invalidFilter"),
disabledFilters: []string{},
generatedFilters: sets.NewString(),
expectedErr: true,
},
"initialize error": {
inputFilters: allFilters,
disabledFilters: []string{},
initializer: &errInitializer{},
generatedFilters: sets.NewString(),
expectedErr: true,
},
}

for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
filters := NewFilters(tt.disabledFilters)
registerLocalFilters(filters)
for i := range tt.inputFilters {
filterName := tt.inputFilters[i]
filters.Register(filterName, func() (filter.ObjectFilter, error) {
if filterName == "invalidFilter" {
return nil, fmt.Errorf("a invalide filter")
}
return &nopObjectHandler{name: filterName}, nil
})
}

runners, err := filters.NewFromFilters(&nopInitializer{})
if err != nil {
initializer := tt.initializer
if initializer == nil {
initializer = &nopInitializer{}
}
runners, err := filters.NewFromFilters(initializer)
if err != nil && tt.expectedErr {
return
} else if err != nil && !tt.expectedErr {
t.Errorf("failed to new from filters, %v", err)
return
} else if err == nil && tt.expectedErr {
t.Errorf("expect an error, but got nil")
return
}

gotRunners := sets.NewString()
Expand Down Expand Up @@ -120,7 +183,26 @@ func TestInitializers(t *testing.T) {
nodeInitializer := initializer.NewNodesInitializer(false, true, yurtFactory)
initializers = append(initializers, nodeInitializer)

if err := initializers.Initialize(&nopObjectHandler{}); err != nil {
t.Errorf("initialize error, %v", err)
testcases := map[string]struct {
filter filter.ObjectFilter
resultErr error
}{
"initialize normally": {
filter: &nopObjectHandler{},
resultErr: nil,
},
"initialize error": {
filter: NewNopNodesErrHandler(),
resultErr: nodesNameErr,
},
}

for k, tc := range testcases {
t.Run(k, func(t *testing.T) {
err := initializers.Initialize(tc.filter)
if !errors.Is(err, tc.resultErr) {
t.Errorf("initialize expect err %v, but got %v", tc.resultErr, err)
}
})
}
}
10 changes: 0 additions & 10 deletions pkg/yurthub/filter/discardcloudservice/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,6 @@ func (sf *discardCloudServiceFilter) SupportedResourceAndVerbs() map[string]sets

func (sf *discardCloudServiceFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object {
switch v := obj.(type) {
case *v1.ServiceList:
var svcNew []v1.Service
for i := range v.Items {
svc := discardCloudService(&v.Items[i])
if svc != nil {
svcNew = append(svcNew, *svc)
}
}
v.Items = svcNew
return v
case *v1.Service:
return discardCloudService(v)
default:
Expand Down
178 changes: 9 additions & 169 deletions pkg/yurthub/filter/discardcloudservice/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,17 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"github.com/openyurtio/openyurt/pkg/util"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/base"
)

func TestRegister(t *testing.T) {
filters := base.NewFilters([]string{})
Register(filters)
if !filters.Enabled(FilterName) {
t.Errorf("couldn't register %s filter", FilterName)
}
}

func TestName(t *testing.T) {
dcsf, _ := NewDiscardCloudServiceFilter()
if dcsf.Name() != FilterName {
Expand Down Expand Up @@ -58,175 +67,6 @@ func TestFilter(t *testing.T) {
responseObj runtime.Object
expectObj runtime.Object
}{
"discard lb service for serviceList": {
responseObj: &corev1.ServiceList{
Items: []corev1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc1",
Namespace: "default",
Annotations: map[string]string{
DiscardServiceAnnotation: "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.187",
Type: corev1.ServiceTypeLoadBalancer,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc2",
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.188",
Type: corev1.ServiceTypeClusterIP,
},
},
},
},
expectObj: &corev1.ServiceList{
Items: []corev1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc2",
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.188",
Type: corev1.ServiceTypeClusterIP,
},
},
},
},
},
"discard cloud clusterIP service for serviceList": {
responseObj: &corev1.ServiceList{
Items: []corev1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "x-tunnel-server-internal-svc",
Namespace: "kube-system",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.187",
Type: corev1.ServiceTypeLoadBalancer,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc2",
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.188",
Type: corev1.ServiceTypeClusterIP,
},
},
},
},
expectObj: &corev1.ServiceList{
Items: []corev1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc2",
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.188",
Type: corev1.ServiceTypeClusterIP,
},
},
},
},
},
"doesn't discard service for serviceList": {
responseObj: &corev1.ServiceList{
Items: []corev1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc1",
Namespace: "default",
Annotations: map[string]string{
DiscardServiceAnnotation: "false",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.187",
Type: corev1.ServiceTypeLoadBalancer,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc2",
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.188",
Type: corev1.ServiceTypeClusterIP,
},
},
},
},
expectObj: &corev1.ServiceList{
Items: []corev1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc1",
Namespace: "default",
Annotations: map[string]string{
DiscardServiceAnnotation: "false",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.187",
Type: corev1.ServiceTypeLoadBalancer,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc2",
Namespace: "default",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.188",
Type: corev1.ServiceTypeClusterIP,
},
},
},
},
},
"discard all services for serviceList": {
responseObj: &corev1.ServiceList{
Items: []corev1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc1",
Namespace: "default",
Annotations: map[string]string{
DiscardServiceAnnotation: "true",
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.187",
Type: corev1.ServiceTypeLoadBalancer,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "x-tunnel-server-internal-svc",
Namespace: "kube-system",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.96.105.188",
Type: corev1.ServiceTypeClusterIP,
},
},
},
},
expectObj: &corev1.ServiceList{},
},
"discard lb service": {
responseObj: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down
9 changes: 0 additions & 9 deletions pkg/yurthub/filter/inclusterconfig/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,6 @@ func (iccf *inClusterConfigFilter) SupportedResourceAndVerbs() map[string]sets.S

func (iccf *inClusterConfigFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object {
switch v := obj.(type) {
case *v1.ConfigMapList:
for i := range v.Items {
newCM, mutated := mutateKubeProxyConfigMap(&v.Items[i])
if mutated {
v.Items[i] = *newCM
break
}
}
return v
case *v1.ConfigMap:
cm, _ := mutateKubeProxyConfigMap(v)
return cm
Expand Down
Loading

0 comments on commit 7bc0e01

Please sign in to comment.