Skip to content

Commit

Permalink
improve data filter framework
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch committed Oct 14, 2022
1 parent 5a7b856 commit 916ddd6
Show file tree
Hide file tree
Showing 18 changed files with 661 additions and 365 deletions.
6 changes: 3 additions & 3 deletions charts/openyurt/templates/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ metadata:
namespace: {{ .Release.Namespace | quote }}
data:
cache_agents: ""
filter_servicetopology: ""
filter_discardcloudservice: ""
filter_masterservice: ""
servicetopology: ""
discardcloudservice: ""
masterservice: ""
73 changes: 3 additions & 70 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ import (
ipUtils "github.com/openyurtio/openyurt/pkg/util/ip"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
Expand Down Expand Up @@ -90,7 +87,7 @@ type YurtHubConfiguration struct {
YurtSharedFactory yurtinformers.SharedInformerFactory
WorkingMode util.WorkingMode
KubeletHealthGracePeriod time.Duration
FilterManager *filter.Manager
FilterManager *manager.Manager
CertIPs []net.IP
}

Expand Down Expand Up @@ -129,8 +126,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
}
tenantNs := util.ParseTenantNs(options.YurtHubCertOrganizations)
registerInformers(sharedFactory, yurtSharedFactory, workingMode, serviceTopologyFilterEnabled(options), options.NodePoolName, options.NodeName, tenantNs)
filterManager, err := createFilterManager(options, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, us[0].Host, proxySecureServerDummyAddr, proxySecureServerAddr)

filterManager, err := manager.NewFilterManager(options, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, us[0].Host)
if err != nil {
klog.Errorf("could not create filter manager, %v", err)
return nil, err
Expand Down Expand Up @@ -290,69 +286,6 @@ func registerInformers(informerFactory informers.SharedInformerFactory,

}

// registerAllFilters by order, the front registered filter will be
// called before the behind registered ones.
func registerAllFilters(filters *filter.Filters) {
servicetopology.Register(filters)
masterservice.Register(filters)
discardcloudservice.Register(filters)
}

// generateNameToFilterMapping return union filters that initializations completed.
func generateNameToFilterMapping(filters *filter.Filters,
sharedFactory informers.SharedInformerFactory,
yurtSharedFactory yurtinformers.SharedInformerFactory,
serializerManager *serializer.SerializerManager,
storageWrapper cachemanager.StorageWrapper,
workingMode util.WorkingMode,
nodeName, mutatedMasterServiceAddr string) (map[string]filter.Runner, error) {
if filters == nil {
return nil, nil
}

genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, nodeName, mutatedMasterServiceAddr, workingMode)
initializerChain := filter.FilterInitializers{}
initializerChain = append(initializerChain, genericInitializer)
return filters.NewFromFilters(initializerChain)
}

// createFilterManager will create a filter manager for data filtering framework.
func createFilterManager(options *options.YurtHubOptions,
sharedFactory informers.SharedInformerFactory,
yurtSharedFactory yurtinformers.SharedInformerFactory,
serializerManager *serializer.SerializerManager,
storageWrapper cachemanager.StorageWrapper,
apiserverAddr string,
proxySecureServerDummyAddr string,
proxySecureServerAddr string,
) (*filter.Manager, error) {
if !options.EnableResourceFilter {
return nil, nil
}

if options.WorkingMode == string(util.WorkingModeCloud) {
options.DisabledResourceFilters = append(options.DisabledResourceFilters, filter.DisabledInCloudMode...)
}
filters := filter.NewFilters(options.DisabledResourceFilters)
registerAllFilters(filters)

mutatedMasterServiceAddr := apiserverAddr
if options.AccessServerThroughHub {
if options.EnableDummyIf {
mutatedMasterServiceAddr = proxySecureServerDummyAddr
} else {
mutatedMasterServiceAddr = proxySecureServerAddr
}
}

filterMapping, err := generateNameToFilterMapping(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, util.WorkingMode(options.WorkingMode), options.NodeName, mutatedMasterServiceAddr)
if err != nil {
return nil, err
}

return filter.NewFilterManager(sharedFactory, filterMapping), nil
}

// serviceTopologyFilterEnabled is used to verify the service topology filter should be enabled or not.
func serviceTopologyFilterEnabled(options *options.YurtHubOptions) bool {
if !options.EnableResourceFilter {
Expand Down
6 changes: 3 additions & 3 deletions pkg/yurtctl/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ metadata:
namespace: kube-system
data:
cache_agents: ""
filter_servicetopology: ""
filter_discardcloudservice: ""
filter_masterservice: ""
servicetopology: ""
discardcloudservice: ""
masterservice: ""
`
)
157 changes: 69 additions & 88 deletions pkg/yurthub/filter/approver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,35 @@ import (

type approver struct {
sync.Mutex
reqKeyToName map[string]string
configMapSynced cache.InformerSynced
stopCh chan struct{}
reqKeyToName map[string]string
configMapSynced cache.InformerSynced
supportedResourceAndVerbsForFilter map[string]map[string]sets.String
defaultReqKeyToName map[string]string
stopCh chan struct{}
}

var (
supportedVerbs = sets.NewString("get", "list", "watch")
defaultWhiteListRequests = sets.NewString(reqKey(projectinfo.GetHubName(), "configmaps", "list"), reqKey(projectinfo.GetHubName(), "configmaps", "watch"))
defaultReqKeyToName = map[string]string{
reqKey("kubelet", "services", "list"): MasterServiceFilterName,
reqKey("kubelet", "services", "watch"): MasterServiceFilterName,
reqKey("kube-proxy", "services", "list"): DiscardCloudServiceFilterName,
reqKey("kube-proxy", "services", "watch"): DiscardCloudServiceFilterName,
reqKey("nginx-ingress-controller", "endpoints", "list"): ServiceTopologyFilterName,
reqKey("nginx-ingress-controller", "endpoints", "watch"): ServiceTopologyFilterName,
reqKey("kube-proxy", "endpointslices", "list"): ServiceTopologyFilterName,
reqKey("kube-proxy", "endpointslices", "watch"): ServiceTopologyFilterName,
reqKey("coredns", "endpoints", "list"): ServiceTopologyFilterName,
reqKey("coredns", "endpoints", "watch"): ServiceTopologyFilterName,
reqKey("coredns", "endpointslices", "list"): ServiceTopologyFilterName,
reqKey("coredns", "endpointslices", "watch"): ServiceTopologyFilterName,
}
// defaultBlackListRequests is used for requests that don't need to be filtered.
defaultBlackListRequests = sets.NewString(reqKey(projectinfo.GetHubName(), "configmaps", "list"), reqKey(projectinfo.GetHubName(), "configmaps", "watch"))
)

func newApprover(sharedFactory informers.SharedInformerFactory) *approver {
func NewApprover(sharedFactory informers.SharedInformerFactory, filterSupportedResAndVerbs map[string]map[string]sets.String) Approver {
configMapInformer := sharedFactory.Core().V1().ConfigMaps().Informer()
na := &approver{
reqKeyToName: make(map[string]string),
configMapSynced: configMapInformer.HasSynced,
stopCh: make(chan struct{}),
reqKeyToName: make(map[string]string),
configMapSynced: configMapInformer.HasSynced,
supportedResourceAndVerbsForFilter: filterSupportedResAndVerbs,
stopCh: make(chan struct{}),
}
defaultReqKeyToFilterName := make(map[string]string)
for name, setting := range SupportedComponentsForFilter {
for _, key := range na.parseRequestSetting(name, setting) {
defaultReqKeyToFilterName[key] = name
}
}
na.merge("init", defaultReqKeyToName)
na.defaultReqKeyToName = defaultReqKeyToFilterName

na.merge("init", na.defaultReqKeyToName)
configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: na.addConfigMap,
UpdateFunc: na.updateConfigMap,
Expand All @@ -76,46 +73,27 @@ func newApprover(sharedFactory informers.SharedInformerFactory) *approver {
return na
}

func (a *approver) Approve(req *http.Request) bool {
if isWhitelistReq(req) {
return false
}
if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {
return false
}

func (a *approver) Approve(req *http.Request) (bool, string) {
key := getKeyByRequest(req)
if len(key) == 0 {
return false
return false, ""
}

a.Lock()
defer a.Unlock()
if _, ok := a.reqKeyToName[key]; ok {
return true
if defaultBlackListRequests.Has(key) {
return false, ""
}

return false
}

func (a *approver) GetFilterName(req *http.Request) string {
key := getKeyByRequest(req)
if len(key) == 0 {
return ""
if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {
return false, ""
}

a.Lock()
defer a.Unlock()
return a.reqKeyToName[key]
}

// Determine whether it is a whitelist resource
func isWhitelistReq(req *http.Request) bool {
key := getKeyByRequest(req)
if ok := defaultWhiteListRequests.Has(key); ok {
return true
if runnerName, ok := a.reqKeyToName[key]; ok {
return true, runnerName
}
return false

return false, ""
}

func (a *approver) addConfigMap(obj interface{}) {
Expand All @@ -127,9 +105,9 @@ func (a *approver) addConfigMap(obj interface{}) {
// get reqKeyToName of user request setting from configmap
reqKeyToNameFromCM := make(map[string]string)
for key, setting := range cm.Data {
if name, ok := hasFilterName(key); ok {
for _, key := range parseRequestSetting(setting) {
reqKeyToNameFromCM[key] = name
if filterName, ok := a.hasFilterName(key); ok {
for _, requestKey := range a.parseRequestSetting(filterName, setting) {
reqKeyToNameFromCM[requestKey] = filterName
}
}
}
Expand All @@ -150,17 +128,17 @@ func (a *approver) updateConfigMap(oldObj, newObj interface{}) {
}

// request settings are changed or not
needUpdated := requestSettingsUpdated(oldCM.Data, newCM.Data)
needUpdated := a.requestSettingsUpdated(oldCM.Data, newCM.Data)
if !needUpdated {
return
}

// get reqKeyToName of user request setting from new configmap
reqKeyToNameFromCM := make(map[string]string)
for key, setting := range newCM.Data {
if name, ok := hasFilterName(key); ok {
for _, key := range parseRequestSetting(setting) {
reqKeyToNameFromCM[key] = name
if filterName, ok := a.hasFilterName(key); ok {
for _, requestKey := range a.parseRequestSetting(filterName, setting) {
reqKeyToNameFromCM[requestKey] = filterName
}
}
}
Expand All @@ -185,7 +163,7 @@ func (a *approver) merge(action string, keyToNameSetting map[string]string) {
defer a.Unlock()
// remove current user setting from reqKeyToName and left default setting only
for key := range a.reqKeyToName {
if _, ok := defaultReqKeyToName[key]; !ok {
if _, ok := a.defaultReqKeyToName[key]; !ok {
delete(a.reqKeyToName, key)
}
}
Expand All @@ -202,29 +180,28 @@ func (a *approver) merge(action string, keyToNameSetting map[string]string) {
klog.Infof("current filter setting: %v after %s", a.reqKeyToName, action)
}

// parseRequestSetting extract comp, resource, verbs from setting, and
// make up request keys.
func parseRequestSetting(setting string) []string {
// parseRequestSetting extract comp info from setting, and make up request keys.
// requestSetting format as following(take servicetopology for example):
// servicetopology: "comp1,comp2"
func (a *approver) parseRequestSetting(name, setting string) []string {
reqKeys := make([]string, 0)
for _, reqSetting := range strings.Split(setting, ",") {
parts := strings.Split(reqSetting, "#")
if len(parts) != 2 {
continue
}
resourceAndVerbs, ok := a.supportedResourceAndVerbsForFilter[name]
if !ok {
return reqKeys
}

items := strings.Split(parts[0], "/")
if len(items) != 2 {
continue
for _, comp := range strings.Split(setting, ",") {
if strings.Contains(comp, "/") {
comp = strings.Split(comp, "/")[0]
}
comp := strings.TrimSpace(items[0])
resource := strings.TrimSpace(items[1])
verbs := strings.Split(parts[1], ";")

if len(comp) != 0 && len(resource) != 0 && len(verbs) != 0 {
for i := range verbs {
verb := strings.TrimSpace(verbs[i])
if ok := supportedVerbs.Has(verb); ok {
reqKeys = append(reqKeys, reqKey(comp, resource, verb))
for resource, verbSet := range resourceAndVerbs {
comp = strings.TrimSpace(comp)
resource = strings.TrimSpace(resource)
verbs := verbSet.List()

if len(comp) != 0 && len(resource) != 0 && len(verbs) != 0 {
for i := range verbs {
reqKeys = append(reqKeys, reqKey(comp, resource, strings.TrimSpace(verbs[i])))
}
}
}
Expand All @@ -234,25 +211,29 @@ func parseRequestSetting(setting string) []string {

// hasFilterName check the key that includes a filter name or not.
// and return filter name and check result.
func hasFilterName(key string) (string, bool) {
if strings.HasPrefix(key, "filter_") {
name := strings.TrimSpace(strings.TrimPrefix(key, "filter_"))
return name, len(name) != 0
func (a *approver) hasFilterName(key string) (string, bool) {
name := strings.TrimSpace(key)
if strings.HasPrefix(name, "filter_") {
name = strings.TrimSpace(strings.TrimPrefix(name, "filter_"))
}

if _, ok := a.supportedResourceAndVerbsForFilter[name]; ok {
return name, true
}

return "", false
}

// requestSettingsUpdated is used to verify filter setting is changed or not.
func requestSettingsUpdated(old, new map[string]string) bool {
func (a *approver) requestSettingsUpdated(old, new map[string]string) bool {
for key := range old {
if _, ok := hasFilterName(key); !ok {
if _, ok := a.hasFilterName(key); !ok {
delete(old, key)
}
}

for key := range new {
if _, ok := hasFilterName(key); !ok {
if _, ok := a.hasFilterName(key); !ok {
delete(new, key)
}
}
Expand Down
Loading

0 comments on commit 916ddd6

Please sign in to comment.