diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 15966e891db..0fd3699d3e8 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -40,7 +40,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice" - "github.com/openyurtio/openyurt/pkg/yurthub/filter/ingresscontroller" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/endpointsfilter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer" "github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice" "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" @@ -87,7 +87,7 @@ type YurtHubConfiguration struct { YurtSharedFactory yurtinformers.SharedInformerFactory WorkingMode util.WorkingMode KubeletHealthGracePeriod time.Duration - FilterChain filter.Interface + FilterManager *filter.Manager } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -120,17 +120,17 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { proxySecureServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxySecurePort) workingMode := util.WorkingMode(options.WorkingMode) - var filterChain filter.Interface + var filterMapping map[string]filter.Runner var filters *filter.Filters var serviceTopologyFilterEnabled bool var mutatedMasterServiceAddr string + var filterManager *filter.Manager if options.EnableResourceFilter { if options.WorkingMode == string(util.WorkingModeCloud) { options.DisabledResourceFilters = append(options.DisabledResourceFilters, filter.DisabledInCloudMode...) } filters = filter.NewFilters(options.DisabledResourceFilters) registerAllFilters(filters) - serviceTopologyFilterEnabled = filters.Enabled(filter.ServiceTopologyFilterName) mutatedMasterServiceAddr = us[0].Host if options.AccessServerThroughHub { @@ -147,11 +147,15 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { return nil, err } registerInformers(sharedFactory, yurtSharedFactory, workingMode, serviceTopologyFilterEnabled, options.NodePoolName, options.NodeName) - filterChain, err = createFilterChain(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, workingMode, options.NodeName, mutatedMasterServiceAddr) + filterMapping, err = generateNameToFilterMapping(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, workingMode, options.NodeName, mutatedMasterServiceAddr) if err != nil { return nil, err } + if options.EnableResourceFilter { + filterManager = filter.NewFilterManager(sharedFactory, filterMapping) + } + cfg := &YurtHubConfiguration{ LBMode: options.LBMode, RemoteServers: us, @@ -183,7 +187,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { SharedFactory: sharedFactory, YurtSharedFactory: yurtSharedFactory, KubeletHealthGracePeriod: options.KubeletHealthGracePeriod, - FilterChain: filterChain, + FilterManager: filterManager, } return cfg, nil @@ -272,15 +276,13 @@ func registerInformers(informerFactory informers.SharedInformerFactory, } } - if workingMode == util.WorkingModeEdge { - newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { - tweakListOptions := func(options *metav1.ListOptions) { - options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String() - } - return coreinformers.NewFilteredConfigMapInformer(client, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions) + newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String() } - informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer) + return coreinformers.NewFilteredConfigMapInformer(client, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions) } + informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer) } // registerAllFilters by order, the front registered filter will be @@ -289,17 +291,17 @@ func registerAllFilters(filters *filter.Filters) { servicetopology.Register(filters) masterservice.Register(filters) discardcloudservice.Register(filters) - ingresscontroller.Register(filters) + endpointsfilter.Register(filters) } -// createFilterChain return union filters that initializations completed. -func createFilterChain(filters *filter.Filters, +// generateNameToFilterMapping return union filters that initializations completed. +func generateNameToFilterMapping(filters *filter.Filters, sharedFactory informers.SharedInformerFactory, yurtSharedFactory yurtinformers.SharedInformerFactory, serializerManager *serializer.SerializerManager, storageWrapper cachemanager.StorageWrapper, workingMode util.WorkingMode, - nodeName, mutatedMasterServiceAddr string) (filter.Interface, error) { + nodeName, mutatedMasterServiceAddr string) (map[string]filter.Runner, error) { if filters == nil { return nil, nil } diff --git a/config/setup/yurthub-cfg.yaml b/config/setup/yurthub-cfg.yaml index 1b09ec7a3da..679df092920 100644 --- a/config/setup/yurthub-cfg.yaml +++ b/config/setup/yurthub-cfg.yaml @@ -46,3 +46,7 @@ metadata: namespace: kube-system data: cache_agents: "" + filter_endpoints: coredns/endpoints#list;watch + filter_servicetopology: coredns/endpointslices#list;watch + filter_discardcloudservice: "" + filter_masterservice: "" diff --git a/config/yaml-template/yurthub-cfg.yaml b/config/yaml-template/yurthub-cfg.yaml index ae59b976090..1017fc7bf0b 100644 --- a/config/yaml-template/yurthub-cfg.yaml +++ b/config/yaml-template/yurthub-cfg.yaml @@ -46,3 +46,7 @@ metadata: namespace: kube-system data: cache_agents: "" + filter_endpoints: coredns/endpoints#list;watch + filter_servicetopology: coredns/endpointslices#list;watch + filter_discardcloudservice: "" + filter_masterservice: "" diff --git a/pkg/yurtctl/util/edgenode/common.go b/pkg/yurtctl/util/edgenode/common.go index c40b68f7d17..7cdaa9a8c35 100644 --- a/pkg/yurtctl/util/edgenode/common.go +++ b/pkg/yurtctl/util/edgenode/common.go @@ -173,5 +173,10 @@ metadata: name: yurt-hub-cfg namespace: kube-system data: - cache_agents: ""` + cache_agents: "" + filter_endpoints: coredns/endpoints#list;watch + filter_servicetopology: coredns/endpointslices#list;watch + filter_discardcloudservice: "" + filter_masterservice: "" +` ) diff --git a/pkg/yurthub/filter/approver.go b/pkg/yurthub/filter/approver.go index 88addb306d7..127be0395c0 100644 --- a/pkg/yurthub/filter/approver.go +++ b/pkg/yurthub/filter/approver.go @@ -16,26 +16,213 @@ limitations under the License. package filter -import "k8s.io/apimachinery/pkg/util/sets" +import ( + "fmt" + "net/http" + "strings" + "sync" -type Approver struct { - comp string - resource string - operations sets.String + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +type requestInfo struct { + comp string + resource string + verbs sets.String +} + +type approver struct { + sync.Mutex + nameToRequests map[string][]*requestInfo + whiteListRequests []*requestInfo + configMapSynced cache.InformerSynced + stopCh chan struct{} } -func NewApprover(comp, resource string, verbs ...string) *Approver { - return &Approver{ - comp: comp, - resource: resource, - operations: sets.NewString(verbs...), +var ( + defaultWhiteListRequests = []*requestInfo{ + { + comp: projectinfo.GetHubName(), + resource: "configmaps", + verbs: sets.NewString("list", "watch"), + }, + } + defaultFilterCfg = map[string]string{ + MasterServiceFilterName: "kubelet/services#list;watch", + EndpointsFilterName: "nginx-ingress-controller/endpoints#list;watch", + DiscardCloudServiceFilterName: "kube-proxy/services#list;watch", + ServiceTopologyFilterName: "kube-proxy/endpointslices#list;watch", + } +) + +func newApprover(sharedFactory informers.SharedInformerFactory) *approver { + configMapInformer := sharedFactory.Core().V1().ConfigMaps().Informer() + na := &approver{ + nameToRequests: make(map[string][]*requestInfo), + configMapSynced: configMapInformer.HasSynced, + whiteListRequests: make([]*requestInfo, 0), + stopCh: make(chan struct{}), } + na.whiteListRequests = append(na.whiteListRequests, defaultWhiteListRequests...) + configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: na.addConfigMap, + UpdateFunc: na.updateConfigMap, + }) + return na } -func (a *Approver) Approve(comp, resource, verb string) bool { - if a.comp != comp || a.resource != resource { +func (a *approver) Approve(comp, resource, verb string) bool { + if a.isWhitelistReq(comp, resource, verb) { return false } + if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok { + panic("wait for configMap cache sync timeout") + } + for _, requests := range a.nameToRequests { + a.Lock() + for _, request := range requests { + if request.Equal(comp, resource, verb) { + a.Unlock() + return true + } + } + a.Unlock() + } + return false +} + +func (a *approver) GetFilterName(req *http.Request) string { + ctx := req.Context() + comp, ok := util.ClientComponentFrom(ctx) + if !ok { + return "" + } + + info, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + return "" + } + + for name, requests := range a.nameToRequests { + a.Lock() + for _, request := range requests { + if request.Equal(comp, info.Resource, info.Verb) { + a.Unlock() + return name + } + } + a.Unlock() + } + return "" +} + +// Determine whether it is a whitelist resource +func (a *approver) isWhitelistReq(comp, resource, verb string) bool { + for _, req := range a.whiteListRequests { + if req.Equal(comp, resource, verb) { + return true + } + } + return false +} + +// Parse the special format of filter config: filter_{name}: user-agent1/resource#verb1;verb2, user-agent2/resource#verb1;verb2. +func (a *approver) parseFilterConfig(cfg string, filterType string) []*requestInfo { + var reqs []*requestInfo + cfg = a.mergeFilterConfig(cfg, filterType) + for _, configArr := range strings.Split(cfg, ",") { + cfg := strings.Split(configArr, "#") + if len(cfg) != 2 { + continue + } + + v := strings.Split(cfg[0], "/") + if len(v) != 2 { + continue + } + + req := &requestInfo{ + comp: v[0], + resource: v[1], + verbs: sets.NewString(strings.Split(cfg[1], ";")...), + } + reqs = append(reqs, req) + } + return reqs +} + +// merge default filter to custom filter +func (a *approver) mergeFilterConfig(cfg, filterType string) string { + if config, ok := defaultFilterCfg[filterType]; ok { + if len(cfg) != 0 { + return fmt.Sprintf("%v,%v", config, cfg) + } + } + + return cfg +} + +func (a *approver) addConfigMap(obj interface{}) { + cfg, ok := obj.(*corev1.ConfigMap) + if !ok { + return + } + for key, value := range cfg.Data { + if strings.HasPrefix(key, "filter_") { + a.updateYurtHubFilterCfg(key, value, "add") + } + } +} + +func (a *approver) updateConfigMap(oldObj, newObj interface{}) { + oldCfg, ok := oldObj.(*corev1.ConfigMap) + if !ok { + return + } + + newCfg, ok := newObj.(*corev1.ConfigMap) + if !ok { + return + } + + for key, value := range newCfg.Data { + if _, ok := oldCfg.Data[key]; !ok { + if strings.HasPrefix(key, "filter_") { + a.updateYurtHubFilterCfg(key, value, "update") + continue + } + } + + if oldCfg.Data[key] != value { + if strings.HasPrefix(key, "filter_") { + a.updateYurtHubFilterCfg(key, value, "update") + } + } + } +} + +// update filter cfg +func (a *approver) updateYurtHubFilterCfg(filterCfgKey, filterCfgValue, action string) { + a.Lock() + defer a.Unlock() + s := strings.Split(filterCfgKey, "_") + a.nameToRequests[s[1]] = a.parseFilterConfig(filterCfgValue, s[1]) + klog.Infof("current filter config: %v after %s", a.nameToRequests, action) +} + +// Judge whether the request is allowed to be filtered +func (req *requestInfo) Equal(comp, resource, verb string) bool { + if req.comp == comp && req.resource == resource && req.verbs.Has(verb) { + return true + } - return a.operations.Has(verb) + return false } diff --git a/pkg/yurthub/filter/approver_test.go b/pkg/yurthub/filter/approver_test.go index 6242460f650..85eb34723a1 100644 --- a/pkg/yurthub/filter/approver_test.go +++ b/pkg/yurthub/filter/approver_test.go @@ -17,48 +17,75 @@ limitations under the License. package filter import ( + "strings" "testing" + + "k8s.io/apimachinery/pkg/util/sets" ) -func TestApprove(t *testing.T) { +func TestUpdateFilterConfig(t *testing.T) { testcases := map[string]struct { - comp string - resource string - verbs []string - comp2 string - resource2 string - verb2 string - expectedResult bool + defaultFilterCfg map[string]string + filterCfgKey string + filterCfgValue string + result map[string][]*requestInfo }{ - "normal case": { - "kubelet", "services", []string{"list", "watch"}, - "kubelet", "services", "list", - true, - }, - "components are not equal": { - "kubelet", "services", []string{"list", "watch"}, - "kube-proxy", "services", "list", - false, - }, - "resources are not equal": { - "kubelet", "services", []string{"list", "watch"}, - "kubelet", "pods", "list", - false, + "update filter_cfg when profile exists": { + filterCfgKey: "filter_discardcloudservice", + filterCfgValue: "w1/services#list;watch,w2/services#list", + result: map[string][]*requestInfo{ + "discardcloudservice": { + { + comp: "kube-proxy", + resource: "services", + verbs: sets.NewString([]string{"list", "watch"}...), + }, + { + comp: "w1", + resource: "services", + verbs: sets.NewString([]string{"list", "watch"}...), + }, + { + comp: "w2", + resource: "services", + verbs: sets.NewString([]string{"list"}...), + }, + }, + }, }, - "verb is not in verbs set": { - "kubelet", "services", []string{"list", "watch"}, - "kubelet", "services", "get", - false, + "when the configuration file is empty": { + filterCfgKey: "filter_endpoints", + filterCfgValue: "", + result: map[string][]*requestInfo{ + "endpoints": { + { + comp: "nginx-ingress-controller", + resource: "endpoints", + verbs: sets.NewString([]string{"list", "watch"}...), + }, + }, + }, }, } for k, tt := range testcases { t.Run(k, func(t *testing.T) { - approver := NewApprover(tt.comp, tt.resource, tt.verbs...) - result := approver.Approve(tt.comp2, tt.resource2, tt.verb2) - - if result != tt.expectedResult { - t.Errorf("Approve error: expected %v, but got %v\n", tt.expectedResult, result) + m := &approver{ + nameToRequests: make(map[string][]*requestInfo), + } + m.updateYurtHubFilterCfg(tt.filterCfgKey, tt.filterCfgValue, "") + fileType := strings.Split(tt.filterCfgKey, "_")[1] + reqs := m.nameToRequests[fileType] + for _, req := range reqs { + var flag bool + for _, res := range tt.result[fileType] { + if req.comp == res.comp && req.resource == res.resource && req.verbs.Equal(res.verbs) { + flag = true + } + } + if !flag { + t.Errorf("After updating the results do not match: %v, Results to be returneds: %v", reqs, tt.result[tt.filterCfgKey]) + } } }) } diff --git a/pkg/yurthub/filter/chain.go b/pkg/yurthub/filter/chain.go index 21b620d5f16..f0a0b5fcc1b 100644 --- a/pkg/yurthub/filter/chain.go +++ b/pkg/yurthub/filter/chain.go @@ -15,47 +15,3 @@ limitations under the License. */ package filter - -import ( - "io" - "net/http" - - apirequest "k8s.io/apiserver/pkg/endpoints/request" - - "github.com/openyurtio/openyurt/pkg/yurthub/util" -) - -type filterChain []Interface - -func (fc filterChain) Approve(comp, resource, verb string) bool { - for _, f := range fc { - if f.Approve(comp, resource, verb) { - return true - } - } - - return false -} - -func (fc filterChain) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { - ctx := req.Context() - comp, ok := util.ClientComponentFrom(ctx) - if !ok { - return 0, rc, nil - } - - info, ok := apirequest.RequestInfoFrom(ctx) - if !ok { - return 0, rc, nil - } - - for _, f := range fc { - if !f.Approve(comp, info.Resource, info.Verb) { - continue - } - - return f.Filter(req, rc, stopCh) - } - - return 0, rc, nil -} diff --git a/pkg/yurthub/filter/constant.go b/pkg/yurthub/filter/constant.go index ccb49a77e69..8f9ea38ee10 100644 --- a/pkg/yurthub/filter/constant.go +++ b/pkg/yurthub/filter/constant.go @@ -34,9 +34,9 @@ const ( // End users should add annotation["openyurt.io/skip-discard"]="true" for LB service. SkipDiscardServiceAnnotation = "openyurt.io/skip-discard" - // ingresscontroller filter is used to reassemble endpoints in order to make the data traffic be + // endpoints filter is used to reassemble endpoints in order to make the data traffic be // load balanced only to the nodepool valid endpoints. - IngressControllerFilterName = "ingresscontroller" + EndpointsFilterName = "endpoints" ) // DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode. diff --git a/pkg/yurthub/filter/discardcloudservice/filter.go b/pkg/yurthub/filter/discardcloudservice/filter.go index 3a92482b013..e6348865c49 100644 --- a/pkg/yurthub/filter/discardcloudservice/filter.go +++ b/pkg/yurthub/filter/discardcloudservice/filter.go @@ -29,19 +29,16 @@ import ( // Register registers a filter func Register(filters *filter.Filters) { - filters.Register(filter.DiscardCloudServiceFilterName, func() (filter.Interface, error) { + filters.Register(filter.DiscardCloudServiceFilterName, func() (filter.Runner, error) { return NewFilter(), nil }) } func NewFilter() *discardCloudServiceFilter { - return &discardCloudServiceFilter{ - Approver: filter.NewApprover("kube-proxy", "services", []string{"list", "watch"}...), - } + return &discardCloudServiceFilter{} } type discardCloudServiceFilter struct { - *filter.Approver serializerManager *serializer.SerializerManager } diff --git a/pkg/yurthub/filter/ingresscontroller/filter.go b/pkg/yurthub/filter/endpointsfilter/filter.go similarity index 65% rename from pkg/yurthub/filter/ingresscontroller/filter.go rename to pkg/yurthub/filter/endpointsfilter/filter.go index ba6bfae728c..f744fb43020 100644 --- a/pkg/yurthub/filter/ingresscontroller/filter.go +++ b/pkg/yurthub/filter/endpointsfilter/filter.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ingresscontroller +package endpointsfilter import ( "fmt" @@ -38,21 +38,18 @@ import ( // Register registers a filter func Register(filters *filter.Filters) { - filters.Register(filter.IngressControllerFilterName, func() (filter.Interface, error) { + filters.Register(filter.EndpointsFilterName, func() (filter.Runner, error) { return NewFilter(), nil }) } -func NewFilter() *ingressControllerFilter { - return &ingressControllerFilter{ - Approver: filter.NewApprover("nginx-ingress-controller", "endpoints", []string{"list", "watch"}...), +func NewFilter() *endpointsFilter { + return &endpointsFilter{ workingMode: util.WorkingModeEdge, - stopCh: make(chan struct{}), } } -type ingressControllerFilter struct { - *filter.Approver +type endpointsFilter struct { serviceLister listers.ServiceLister serviceSynced cache.InformerSynced nodepoolLister appslisters.NodePoolLister @@ -62,15 +59,14 @@ type ingressControllerFilter struct { nodeName string serializerManager *serializer.SerializerManager workingMode util.WorkingMode - stopCh chan struct{} } -func (ssf *ingressControllerFilter) SetWorkingMode(mode util.WorkingMode) error { +func (ssf *endpointsFilter) SetWorkingMode(mode util.WorkingMode) error { ssf.workingMode = mode return nil } -func (ssf *ingressControllerFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { +func (ssf *endpointsFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { ssf.serviceLister = factory.Core().V1().Services().Lister() ssf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced @@ -83,22 +79,22 @@ func (ssf *ingressControllerFilter) SetSharedInformerFactory(factory informers.S return nil } -func (ssf *ingressControllerFilter) SetYurtSharedInformerFactory(yurtFactory yurtinformers.SharedInformerFactory) error { +func (ssf *endpointsFilter) SetYurtSharedInformerFactory(yurtFactory yurtinformers.SharedInformerFactory) error { ssf.nodepoolLister = yurtFactory.Apps().V1alpha1().NodePools().Lister() ssf.nodePoolSynced = yurtFactory.Apps().V1alpha1().NodePools().Informer().HasSynced return nil } -func (ssf *ingressControllerFilter) SetNodeName(nodeName string) error { +func (ssf *endpointsFilter) SetNodeName(nodeName string) error { ssf.nodeName = nodeName return nil } -func (ssf *ingressControllerFilter) SetStorageWrapper(s cachemanager.StorageWrapper) error { +func (ssf *endpointsFilter) SetStorageWrapper(s cachemanager.StorageWrapper) error { if len(ssf.nodeName) == 0 { - return fmt.Errorf("node name for ingressControllerFilter is not ready") + return fmt.Errorf("node name for endpointsFilter is not ready") } // hub agent will list/watch node from kube-apiserver when hub agent work as cloud mode @@ -139,30 +135,22 @@ func (ssf *ingressControllerFilter) SetStorageWrapper(s cachemanager.StorageWrap return nil } -func (ssf *ingressControllerFilter) SetSerializerManager(s *serializer.SerializerManager) error { +func (ssf *endpointsFilter) SetSerializerManager(s *serializer.SerializerManager) error { ssf.serializerManager = s return nil } -func (ssf *ingressControllerFilter) Approve(comp, resource, verb string) bool { - if !ssf.Approver.Approve(comp, resource, verb) { - return false - } - - if ok := cache.WaitForCacheSync(ssf.stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok { - return false +func (ssf *endpointsFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { + if ok := cache.WaitForCacheSync(stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok { + return 0, rc, nil } - return true -} - -func (ssf *ingressControllerFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { s := filterutil.CreateSerializer(req, ssf.serializerManager) if s == nil { - klog.Errorf("skip filter, failed to create serializer in ingressControllerFilter") + klog.Errorf("skip filter, failed to create serializer in endpointsFilter") return 0, rc, nil } - handler := NewIngressControllerFilterHandler(ssf.nodeName, s, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter) - return filter.NewFilterReadCloser(req, rc, handler, s, filter.IngressControllerFilterName, stopCh) + handler := NewEndpointsFilterHandler(ssf.nodeName, s, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter) + return filter.NewFilterReadCloser(req, rc, handler, s, filter.EndpointsFilterName, stopCh) } diff --git a/pkg/yurthub/filter/ingresscontroller/handler.go b/pkg/yurthub/filter/endpointsfilter/handler.go similarity index 89% rename from pkg/yurthub/filter/ingresscontroller/handler.go rename to pkg/yurthub/filter/endpointsfilter/handler.go index a2e7da5facc..e0a3665dc0e 100644 --- a/pkg/yurthub/filter/ingresscontroller/handler.go +++ b/pkg/yurthub/filter/endpointsfilter/handler.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package ingresscontroller +package endpointsfilter import ( "io" @@ -30,7 +30,7 @@ import ( appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1" ) -type ingressControllerFilterHandler struct { +type endpointsFilterHandler struct { nodeName string serializer *serializer.Serializer serviceLister listers.ServiceLister @@ -38,13 +38,13 @@ type ingressControllerFilterHandler struct { nodeGetter filter.NodeGetter } -func NewIngressControllerFilterHandler( +func NewEndpointsFilterHandler( nodeName string, serializer *serializer.Serializer, serviceLister listers.ServiceLister, nodePoolLister appslisters.NodePoolLister, nodeGetter filter.NodeGetter) filter.Handler { - return &ingressControllerFilterHandler{ + return &endpointsFilterHandler{ nodeName: nodeName, serializer: serializer, serviceLister: serviceLister, @@ -54,10 +54,10 @@ func NewIngressControllerFilterHandler( } // ObjectResponseFilter filter the endpoints from get response object and return the bytes -func (fh *ingressControllerFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { +func (fh *endpointsFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { eps, err := fh.serializer.Decode(b) if err != nil || eps == nil { - klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of ingressControllerFilterHandler, %v", err) + klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of endpointsFilterHandler, %v", err) return b, nil } @@ -79,14 +79,14 @@ func (fh *ingressControllerFilterHandler) ObjectResponseFilter(b []byte) ([]byte } // FilterWatchObject filter the endpoints from watch response object and return the bytes -func (fh *ingressControllerFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error { +func (fh *endpointsFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error { defer func() { close(ch) }() d, err := fh.serializer.WatchDecoder(rc) if err != nil { - klog.Errorf("StreamResponseFilter of ingressControllerFilterHandler ended with error, %v", err) + klog.Errorf("StreamResponseFilter of endpointsFilterHandler ended with error, %v", err) return err } for { @@ -112,7 +112,7 @@ func (fh *ingressControllerFilterHandler) StreamResponseFilter(rc io.ReadCloser, } // reassembleEndpoints will filter the valid endpoints to its nodepool -func (fh *ingressControllerFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints { +func (fh *endpointsFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints { svcName := endpoints.Name _, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName) if err != nil { diff --git a/pkg/yurthub/filter/filter.go b/pkg/yurthub/filter/filter.go index 7fcef820aba..fdb9219b12e 100644 --- a/pkg/yurthub/filter/filter.go +++ b/pkg/yurthub/filter/filter.go @@ -32,7 +32,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" ) -type Factory func() (Interface, error) +type Factory func() (Runner, error) type Filters struct { sync.Mutex @@ -49,8 +49,8 @@ func NewFilters(disabledFilters []string) *Filters { } } -func (fs *Filters) NewFromFilters(initializer FilterInitializer) (Interface, error) { - var filters []Interface +func (fs *Filters) NewFromFilters(initializer FilterInitializer) (map[string]Runner, error) { + var filterMapping = make(map[string]Runner) for _, name := range fs.names { if fs.Enabled(name) { factory, found := fs.registry[name] @@ -68,16 +68,15 @@ func (fs *Filters) NewFromFilters(initializer FilterInitializer) (Interface, err return nil, err } klog.V(2).Infof("Filter %s initialize successfully", name) - - filters = append(filters, ins) + filterMapping[name] = ins } } - if len(filters) == 0 { + if len(filterMapping) == 0 { return nil, nil } - return filterChain(filters), nil + return filterMapping, nil } func (fs *Filters) Register(name string, fn Factory) { @@ -105,7 +104,7 @@ func (fs *Filters) Enabled(name string) bool { type FilterInitializers []FilterInitializer -func (fis FilterInitializers) Initialize(ins Interface) error { +func (fis FilterInitializers) Initialize(ins Runner) error { for _, fi := range fis { if err := fi.Initialize(ins); err != nil { return err diff --git a/pkg/yurthub/filter/initializer/initializer.go b/pkg/yurthub/filter/initializer/initializer.go index 16115e072d1..8b885cd313e 100644 --- a/pkg/yurthub/filter/initializer/initializer.go +++ b/pkg/yurthub/filter/initializer/initializer.go @@ -92,7 +92,7 @@ func New(factory informers.SharedInformerFactory, } // Initialize used for executing filter initialization -func (fi *genericFilterInitializer) Initialize(ins filter.Interface) error { +func (fi *genericFilterInitializer) Initialize(ins filter.Runner) error { if wants, ok := ins.(WantsWorkingMode); ok { if err := wants.SetWorkingMode(fi.workingMode); err != nil { return err diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go index 2b59f0a590d..23774c3bb5f 100644 --- a/pkg/yurthub/filter/interfaces.go +++ b/pkg/yurthub/filter/interfaces.go @@ -25,19 +25,21 @@ import ( ) type FilterInitializer interface { - Initialize(filter Interface) error + Initialize(filter Runner) error } -// Interface of data filtering framework. -type Interface interface { - // Approve is used to determine whether the data returned - // from the cloud needs to enter the filtering framework for processing. - Approve(comp, resource, verb string) bool - +// Runner of data filtering framework. +type Runner interface { // Filter is used to filter data returned from the cloud. Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) } +// Runner of data Approver filter. +type Approver interface { + Approve(comp, resource, verb string) bool + GetFilterName(req *http.Request) string +} + // Handler customizes data filtering processing interface for each handler. // In the data filtering framework, data is mainly divided into two types: // Object data: data returned by list/get request. diff --git a/pkg/yurthub/filter/manager.go b/pkg/yurthub/filter/manager.go new file mode 100644 index 00000000000..21194b1b2c4 --- /dev/null +++ b/pkg/yurthub/filter/manager.go @@ -0,0 +1,67 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "io" + "net/http" + + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/informers" + + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +type Manager struct { + Approver + NameToFilter map[string]Runner +} + +func NewFilterManager(sharedFactory informers.SharedInformerFactory, filters map[string]Runner) *Manager { + m := &Manager{ + Approver: newApprover(sharedFactory), + NameToFilter: make(map[string]Runner), + } + + for name, runner := range filters { + m.NameToFilter[name] = runner + } + + return m +} + +func (m *Manager) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { + reqName := m.Approver.GetFilterName(req) + if runner, ok := m.NameToFilter[reqName]; ok { + return runner.Filter(req, rc, stopCh) + } + return 0, rc, nil +} + +func (m *Manager) Approve(req *http.Request) bool { + ctx := req.Context() + comp, ok := util.ClientComponentFrom(ctx) + if !ok { + return false + } + + info, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + return false + } + return m.Approver.Approve(comp, info.Resource, info.Verb) +} diff --git a/pkg/yurthub/filter/masterservice/filter.go b/pkg/yurthub/filter/masterservice/filter.go index af288f8c401..44c283362f0 100644 --- a/pkg/yurthub/filter/masterservice/filter.go +++ b/pkg/yurthub/filter/masterservice/filter.go @@ -31,24 +31,19 @@ import ( // Register registers a filter func Register(filters *filter.Filters) { - filters.Register(filter.MasterServiceFilterName, func() (filter.Interface, error) { + filters.Register(filter.MasterServiceFilterName, func() (filter.Runner, error) { return NewFilter(), nil }) } func NewFilter() *masterServiceFilter { - return &masterServiceFilter{ - Approver: filter.NewApprover("kubelet", "services", []string{"list", "watch"}...), - stopCh: make(chan struct{}), - } + return &masterServiceFilter{} } type masterServiceFilter struct { - *filter.Approver serializerManager *serializer.SerializerManager host string port int32 - stopCh chan struct{} } func (msf *masterServiceFilter) SetSerializerManager(s *serializer.SerializerManager) error { diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go index d1af741022e..f9b414d64ac 100644 --- a/pkg/yurthub/filter/servicetopology/filter.go +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -38,21 +38,18 @@ import ( // Register registers a filter func Register(filters *filter.Filters) { - filters.Register(filter.ServiceTopologyFilterName, func() (filter.Interface, error) { + filters.Register(filter.ServiceTopologyFilterName, func() (filter.Runner, error) { return NewFilter(), nil }) } func NewFilter() *serviceTopologyFilter { return &serviceTopologyFilter{ - Approver: filter.NewApprover("kube-proxy", "endpointslices", []string{"list", "watch"}...), workingMode: util.WorkingModeEdge, - stopCh: make(chan struct{}), } } type serviceTopologyFilter struct { - *filter.Approver serviceLister listers.ServiceLister serviceSynced cache.InformerSynced nodepoolLister appslisters.NodePoolLister @@ -62,7 +59,6 @@ type serviceTopologyFilter struct { nodeName string workingMode util.WorkingMode serializerManager *serializer.SerializerManager - stopCh chan struct{} } func (ssf *serviceTopologyFilter) SetWorkingMode(mode util.WorkingMode) error { @@ -144,19 +140,10 @@ func (ssf *serviceTopologyFilter) SetSerializerManager(s *serializer.SerializerM return nil } -func (ssf *serviceTopologyFilter) Approve(comp, resource, verb string) bool { - if !ssf.Approver.Approve(comp, resource, verb) { - return false - } - - if ok := cache.WaitForCacheSync(ssf.stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok { - return false - } - - return true -} - func (ssf *serviceTopologyFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { + if ok := cache.WaitForCacheSync(stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok { + return 0, rc, nil + } s := filterutil.CreateSerializer(req, ssf.serializerManager) if s == nil { klog.Errorf("skip filter, failed to create serializer in serviceTopologyFilter") diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index b29e5ac45b6..ca85af697e1 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -67,7 +67,7 @@ func NewYurtReverseProxyHandler( transportMgr, healthChecker, certManager, - yurtHubCfg.FilterChain, + yurtHubCfg.FilterManager, stopCh) if err != nil { return nil, err diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index 567792e01d4..8adac0cc083 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -129,11 +129,11 @@ func NewLoadBalancer( transportMgr transport.Interface, healthChecker healthchecker.HealthChecker, certManager interfaces.YurtCertificateManager, - filterChain filter.Interface, + filterManager *filter.Manager, stopCh <-chan struct{}) (LoadBalancer, error) { backends := make([]*RemoteProxy, 0, len(remoteServers)) for i := range remoteServers { - b, err := NewRemoteProxy(remoteServers[i], cacheMgr, transportMgr, healthChecker, filterChain, stopCh) + b, err := NewRemoteProxy(remoteServers[i], cacheMgr, transportMgr, healthChecker, filterManager, stopCh) if err != nil { klog.Errorf("could not new proxy backend(%s), %v", remoteServers[i].String(), err) continue diff --git a/pkg/yurthub/proxy/remote/remote.go b/pkg/yurthub/proxy/remote/remote.go index a7ba3fbe7e6..a0f71e644dc 100644 --- a/pkg/yurthub/proxy/remote/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -44,7 +44,7 @@ type RemoteProxy struct { reverseProxy *httputil.ReverseProxy cacheMgr cachemanager.CacheManager remoteServer *url.URL - filterChain filter.Interface + filterManager *filter.Manager currentTransport http.RoundTripper bearerTransport http.RoundTripper upgradeHandler *proxy.UpgradeAwareHandler @@ -64,7 +64,7 @@ func NewRemoteProxy(remoteServer *url.URL, cacheMgr cachemanager.CacheManager, transportMgr transport.Interface, healthChecker healthchecker.HealthChecker, - filterChain filter.Interface, + filterManager *filter.Manager, stopCh <-chan struct{}) (*RemoteProxy, error) { currentTransport := transportMgr.CurrentTransport() if currentTransport == nil { @@ -85,7 +85,7 @@ func NewRemoteProxy(remoteServer *url.URL, reverseProxy: httputil.NewSingleHostReverseProxy(remoteServer), cacheMgr: cacheMgr, remoteServer: remoteServer, - filterChain: filterChain, + filterManager: filterManager, currentTransport: currentTransport, bearerTransport: bearerTransport, upgradeHandler: upgradeAwareHandler, @@ -157,16 +157,18 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error { req = req.WithContext(ctx) // filter response data - if rp.filterChain != nil { - size, filterRc, err := rp.filterChain.Filter(req, resp.Body, rp.stopCh) - if err != nil { - klog.Errorf("failed to filter response for %s, %v", util.ReqString(req), err) - return err - } - resp.Body = filterRc - if size > 0 { - resp.ContentLength = int64(size) - resp.Header.Set("Content-Length", fmt.Sprint(size)) + if rp.filterManager != nil { + if rp.filterManager.Approve(req) { + size, filterRc, err := rp.filterManager.Filter(req, resp.Body, rp.stopCh) + if err != nil { + klog.Errorf("failed to filter response for %s, %v", util.ReqString(req), err) + return err + } + resp.Body = filterRc + if size > 0 { + resp.ContentLength = int64(size) + resp.Header.Set("Content-Length", fmt.Sprint(size)) + } } }