Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use configmap to configure the data source of filter framework #749

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/ingresscontroller"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/endpointsfilter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
Expand Down Expand Up @@ -87,7 +87,7 @@ type YurtHubConfiguration struct {
YurtSharedFactory yurtinformers.SharedInformerFactory
WorkingMode util.WorkingMode
KubeletHealthGracePeriod time.Duration
FilterChain filter.Interface
FilterManager *filter.Manager
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -120,17 +120,17 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
proxySecureServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxySecurePort)
workingMode := util.WorkingMode(options.WorkingMode)

var filterChain filter.Interface
var filterMapping map[string]filter.Runner
var filters *filter.Filters
var serviceTopologyFilterEnabled bool
var mutatedMasterServiceAddr string
var filterManager *filter.Manager
if options.EnableResourceFilter {
if options.WorkingMode == string(util.WorkingModeCloud) {
options.DisabledResourceFilters = append(options.DisabledResourceFilters, filter.DisabledInCloudMode...)
}
filters = filter.NewFilters(options.DisabledResourceFilters)
registerAllFilters(filters)

serviceTopologyFilterEnabled = filters.Enabled(filter.ServiceTopologyFilterName)
mutatedMasterServiceAddr = us[0].Host
if options.AccessServerThroughHub {
Expand All @@ -147,11 +147,15 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
return nil, err
}
registerInformers(sharedFactory, yurtSharedFactory, workingMode, serviceTopologyFilterEnabled, options.NodePoolName, options.NodeName)
filterChain, err = createFilterChain(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, workingMode, options.NodeName, mutatedMasterServiceAddr)
filterMapping, err = generateNameToFilterMapping(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, workingMode, options.NodeName, mutatedMasterServiceAddr)
if err != nil {
return nil, err
}

if options.EnableResourceFilter {
filterManager = filter.NewFilterManager(sharedFactory, filterMapping)
}

cfg := &YurtHubConfiguration{
LBMode: options.LBMode,
RemoteServers: us,
Expand Down Expand Up @@ -183,7 +187,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
SharedFactory: sharedFactory,
YurtSharedFactory: yurtSharedFactory,
KubeletHealthGracePeriod: options.KubeletHealthGracePeriod,
FilterChain: filterChain,
FilterManager: filterManager,
}

return cfg, nil
Expand Down Expand Up @@ -272,15 +276,13 @@ func registerInformers(informerFactory informers.SharedInformerFactory,
}
}

if workingMode == util.WorkingModeEdge {
newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
}
return coreinformers.NewFilteredConfigMapInformer(client, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
}
informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
return coreinformers.NewFilteredConfigMapInformer(client, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
}
informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
}

// registerAllFilters by order, the front registered filter will be
Expand All @@ -289,17 +291,17 @@ func registerAllFilters(filters *filter.Filters) {
servicetopology.Register(filters)
masterservice.Register(filters)
discardcloudservice.Register(filters)
ingresscontroller.Register(filters)
endpointsfilter.Register(filters)
}

// createFilterChain return union filters that initializations completed.
func createFilterChain(filters *filter.Filters,
// generateNameToFilterMapping return union filters that initializations completed.
func generateNameToFilterMapping(filters *filter.Filters,
sharedFactory informers.SharedInformerFactory,
yurtSharedFactory yurtinformers.SharedInformerFactory,
serializerManager *serializer.SerializerManager,
storageWrapper cachemanager.StorageWrapper,
workingMode util.WorkingMode,
nodeName, mutatedMasterServiceAddr string) (filter.Interface, error) {
nodeName, mutatedMasterServiceAddr string) (map[string]filter.Runner, error) {
if filters == nil {
return nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions config/setup/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ metadata:
namespace: kube-system
data:
cache_agents: ""
filter_endpoints: coredns/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""
4 changes: 4 additions & 0 deletions config/yaml-template/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ metadata:
namespace: kube-system
data:
cache_agents: ""
filter_endpoints: coredns/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""
7 changes: 6 additions & 1 deletion pkg/yurtctl/util/edgenode/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,10 @@ metadata:
name: yurt-hub-cfg
namespace: kube-system
data:
cache_agents: ""`
cache_agents: ""
filter_endpoints: coredns/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""
`
)
211 changes: 198 additions & 13 deletions pkg/yurthub/filter/approver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,211 @@ limitations under the License.

package filter

import "k8s.io/apimachinery/pkg/util/sets"
import (
"fmt"
"net/http"
"strings"
"sync"

type Approver struct {
comp string
resource string
operations sets.String
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

type requestInfo struct {
comp string
resource string
verbs sets.String
}

type approver struct {
sync.Mutex
nameToRequests map[string][]*requestInfo
whiteListRequests []*requestInfo
configMapSynced cache.InformerSynced
stopCh chan struct{}
}

func NewApprover(comp, resource string, verbs ...string) *Approver {
return &Approver{
comp: comp,
resource: resource,
operations: sets.NewString(verbs...),
var (
defaultWhiteListRequests = []*requestInfo{
{
comp: projectinfo.GetHubName(),
resource: "configmaps",
verbs: sets.NewString("list", "watch"),
},
}
defaultFilterCfg = map[string]string{
MasterServiceFilterName: "kubelet/services#list;watch",
EndpointsFilterName: "nginx-ingress-controller/endpoints#list;watch",
DiscardCloudServiceFilterName: "kube-proxy/services#list;watch",
ServiceTopologyFilterName: "kube-proxy/endpointslices#list;watch",
}
)

func newApprover(sharedFactory informers.SharedInformerFactory) *approver {
configMapInformer := sharedFactory.Core().V1().ConfigMaps().Informer()
na := &approver{
nameToRequests: make(map[string][]*requestInfo),
configMapSynced: configMapInformer.HasSynced,
whiteListRequests: make([]*requestInfo, 0),
stopCh: make(chan struct{}),
}
na.whiteListRequests = append(na.whiteListRequests, defaultWhiteListRequests...)
configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: na.addConfigMap,
UpdateFunc: na.updateConfigMap,
})
return na
}

func (a *Approver) Approve(comp, resource, verb string) bool {
if a.comp != comp || a.resource != resource {
func (a *approver) Approve(comp, resource, verb string) bool {
if a.isWhitelistReq(comp, resource, verb) {
return false
}
if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {
panic("wait for configMap cache sync timeout")
}
a.Lock()
defer a.Unlock()
for _, requests := range a.nameToRequests {
for _, request := range requests {
if request.Equal(comp, resource, verb) {

return true
}
}
}
return false
}

func (a *approver) GetFilterName(req *http.Request) string {
ctx := req.Context()
comp, ok := util.ClientComponentFrom(ctx)
if !ok {
return ""
}

info, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
return ""
}
a.Lock()
defer a.Unlock()
for name, requests := range a.nameToRequests {
for _, request := range requests {
if request.Equal(comp, info.Resource, info.Verb) {
return name
}
}
}
return ""
}

// Determine whether it is a whitelist resource
func (a *approver) isWhitelistReq(comp, resource, verb string) bool {
for _, req := range a.whiteListRequests {
if req.Equal(comp, resource, verb) {
return true
}
}
return false
}

// Parse the special format of filter config: filter_{name}: user-agent1/resource#verb1;verb2, user-agent2/resource#verb1;verb2.
func (a *approver) parseFilterConfig(cfg string, filterType string) []*requestInfo {
var reqs []*requestInfo
cfg = a.mergeFilterConfig(cfg, filterType)
for _, configArr := range strings.Split(cfg, ",") {
cfg := strings.Split(configArr, "#")
if len(cfg) != 2 {
continue
}

v := strings.Split(cfg[0], "/")
if len(v) != 2 {
continue
}

req := &requestInfo{
comp: v[0],
resource: v[1],
verbs: sets.NewString(strings.Split(cfg[1], ";")...),
}
reqs = append(reqs, req)
}
return reqs
}

// merge default filter to custom filter
func (a *approver) mergeFilterConfig(cfg, filterType string) string {
if config, ok := defaultFilterCfg[filterType]; ok {
if len(cfg) != 0 {
return fmt.Sprintf("%v,%v", config, cfg)
}
}

return cfg
}

func (a *approver) addConfigMap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}
for key, value := range cfg.Data {
if strings.HasPrefix(key, "filter_") {
a.updateYurtHubFilterCfg(key, value, "add")
}
}
rambohe-ch marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *approver) updateConfigMap(oldObj, newObj interface{}) {
oldCfg, ok := oldObj.(*corev1.ConfigMap)
if !ok {
return
}

newCfg, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}

for key, value := range newCfg.Data {
if _, ok := oldCfg.Data[key]; !ok {
if strings.HasPrefix(key, "filter_") {
a.updateYurtHubFilterCfg(key, value, "update")
continue
}
}

if oldCfg.Data[key] != value {
if strings.HasPrefix(key, "filter_") {
a.updateYurtHubFilterCfg(key, value, "update")
}
}
}
}

// update filter cfg
func (a *approver) updateYurtHubFilterCfg(filterCfgKey, filterCfgValue, action string) {
a.Lock()
defer a.Unlock()
s := strings.Split(filterCfgKey, "_")
a.nameToRequests[s[1]] = a.parseFilterConfig(filterCfgValue, s[1])
klog.Infof("current filter config: %v after %s", a.nameToRequests, action)
}

// Judge whether the request is allowed to be filtered
func (req *requestInfo) Equal(comp, resource, verb string) bool {
if req.comp == comp && req.resource == resource && req.verbs.Has(verb) {
return true
}

return a.operations.Has(verb)
return false
}
Loading