Skip to content

Commit

Permalink
Use configmap to configure the data source of filter framework
Browse files Browse the repository at this point in the history
  • Loading branch information
yingjianjian committed Feb 26, 2022
1 parent fa65aa3 commit c3878c9
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 64 deletions.
118 changes: 108 additions & 10 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package config

import (
"context"
"crypto/tls"
"fmt"
"net"
Expand All @@ -27,25 +28,27 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
k8s_serializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

"github.com/openyurtio/openyurt/cmd/yurthub/app/options"
"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/ingresscontroller"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/register"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
yurtcorev1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1"
Expand Down Expand Up @@ -90,6 +93,19 @@ type YurtHubConfiguration struct {
FilterChain filter.Interface
}

const (
// External APIs use this configuration to access Kube-apiserver
yurtCfgPath = "/var/lib/yurthub/yurthub.conf"
// Get yurt-hub-cfg path locally
cacheYurtHubCfgKey = "yurthub/configmaps/kube-system/yurt-hub-cfg"
// The type of resource in yurt-hub-cfg
endpointsType = "endpoints"
endpointSlicesType = "endpointslices"
servicesType = "service"
// file name corresponding to servicefilterfile
serviceFilterCfgKey = "config.yaml"
)

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
us, err := parseRemoteServers(options.ServerAddr)
Expand Down Expand Up @@ -129,7 +145,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
options.DisabledResourceFilters = append(options.DisabledResourceFilters, filter.DisabledInCloudMode...)
}
filters = filter.NewFilters(options.DisabledResourceFilters)
registerAllFilters(filters)
registerAllFilters(filters, storageManager)

serviceTopologyFilterEnabled = filters.Enabled(filter.ServiceTopologyFilterName)
mutatedMasterServiceAddr = us[0].Host
Expand Down Expand Up @@ -285,11 +301,72 @@ func registerInformers(informerFactory informers.SharedInformerFactory,

// registerAllFilters by order, the front registered filter will be
// called before the behind registered ones.
func registerAllFilters(filters *filter.Filters) {
servicetopology.Register(filters)
masterservice.Register(filters)
discardcloudservice.Register(filters)
ingresscontroller.Register(filters)
func registerAllFilters(filters *filter.Filters, store storage.Store) {
yurtHubCfg, err := GetYurtCfgObject(store)
if err != nil {
panic(err)
}
serviceTopologyFilterConfig := parseYamlConfigMapData(yurtHubCfg.Data[serviceFilterCfgKey])
klog.V(5).Infof("serviceTopologyFilterConfig: %v", serviceTopologyFilterConfig)
for _, serviceFilter := range serviceTopologyFilterConfig.ServiceTopologyFilter {
for _, resource := range serviceFilter.Resources {
filterName := fmt.Sprintf("%v-%v", serviceFilter.Name, resource)
switch resource {
case endpointSlicesType:
klog.Infof("the name of the registered filter endpointSlices is %v,comp is %v,resource is %v, verbs is %v, type is %v", serviceFilter.Name, serviceFilter.UserAgent, resource, serviceFilter.Verbs, serviceFilter.Type)
register.EndpointSlicesTopology(filters, filterName, serviceFilter.UserAgent, resource, serviceFilter.Type, serviceFilter.Verbs)
case endpointsType:
klog.Infof("the name of the registered filter endpoints is %v,comp is %v,resource is %v, verbs is %v, type is %v", serviceFilter.Name, serviceFilter.UserAgent, resource, serviceFilter.Verbs, serviceFilter.Type)
register.EndpointTopology(filters, filterName, serviceFilter.UserAgent, resource, serviceFilter.Type, serviceFilter.Verbs)
case servicesType:
register.ServiceFilter(filters, filterName, serviceFilter.UserAgent, resource, serviceFilter.Type, serviceFilter.Verbs)
klog.Infof("the name of the registered filter endpoints is %v,comp is %v,resource is %v, verbs is %v, type is %v", serviceFilter.Name, serviceFilter.UserAgent, resource, serviceFilter.Verbs, serviceFilter.Type)
default:
klog.Errorf("wrong resource type: %v", resource)
}
}
}
}

func GetYurtCfgObjectByApi() (*corev1.ConfigMap, error) {
Client, err := util.NewClientOutOfCluster(yurtCfgPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize kubernetes client: %v", err)
}
config, err := Client.CoreV1().ConfigMaps(util.YurtHubNamespace).Get(context.Background(), util.YurthubConfigMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get %v object from configmap: %v", util.YurthubConfigMapName, err)
}
return config, nil
}

// Get the data from the local cache first, and then get the data from the API
func GetYurtCfgObject(store storage.Store) (*corev1.ConfigMap, error) {
yurtHubcfg, err := store.Get(cacheYurtHubCfgKey)
if err == storage.ErrStorageNotFound {
return GetYurtCfgObjectByApi()
}
if err != nil {
return nil, err
}
obj, err := yamlToObject(yurtHubcfg)
if err != nil {
return nil, err
}
cm, ok := obj.(*corev1.ConfigMap)
if !ok {
return nil, fmt.Errorf("fail to assert configmap")
}
return cm, nil
}

func yamlToObject(cfg []byte) (k8sruntime.Object, error) {
decode := k8s_serializer.NewCodecFactory(scheme.Scheme).UniversalDeserializer().Decode
obj, _, err := decode(cfg, nil, nil)
if err != nil {
return nil, err
}
return obj, nil
}

// createFilterChain return union filters that initializations completed.
Expand All @@ -309,3 +386,24 @@ func createFilterChain(filters *filter.Filters,
initializerChain = append(initializerChain, genericInitializer)
return filters.NewFromFilters(initializerChain)
}

type ServiceTopologyFilterConfig struct {
ServiceTopologyFilter []ServiceTopologyFilterSpec
}

type ServiceTopologyFilterSpec struct {
UserAgent string `json:"user_agent,omitempty"`
Resources []string `json:"resources,omitempty"`
Verbs []string `json:"verbs,omitempty"`
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
}

func parseYamlConfigMapData(cfg string) ServiceTopologyFilterConfig {
configFileByYml := ServiceTopologyFilterConfig{}
err := yaml.Unmarshal([]byte(cfg), &configFileByYml)
if err != nil {
panic(err)
}
return configFileByYml
}
47 changes: 47 additions & 0 deletions config/setup/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,50 @@ metadata:
namespace: kube-system
data:
cache_agents: ""
config.yaml: |
serviceTopologyFilter:
- user_agent: coredns
name: coredns
type: servicetopology
resources:
- endpointslices
- endpoints
verbs:
- "list"
- "watch"
- user_agent: kube-proxy
name: servicetopology
type: servicetopology
resources:
- endpointslices
verbs:
- "list"
- "watch"
- user_agent: nginx-ingress-controller
name: ingresscontroller
type: servicetopology
resources:
- endpoints
verbs:
- "list"
- "watch"
- user_agent: kube-proxy
name: discardcloud
type: discardcloudservice
resources:
- services
verbs:
- "list"
- "watch"
- user_agent: kubelet
name: masterservice
type: masterservice
resources:
- services
verbs:
- "list"
- "watch"
47 changes: 47 additions & 0 deletions config/yaml-template/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,50 @@ metadata:
namespace: kube-system
data:
cache_agents: ""
config.yaml: |
serviceTopologyFilter:
- user_agent: coredns
name: coredns
type: servicetopology
resources:
- endpointslices
- endpoints
verbs:
- "list"
- "watch"
- user_agent: kube-proxy
name: servicetopology
type: servicetopology
resources:
- endpointslices
verbs:
- "list"
- "watch"
- user_agent: nginx-ingress-controller
name: ingresscontroller
type: servicetopology
resources:
- endpoints
verbs:
- "list"
- "watch"
- user_agent: kube-proxy
name: discardcloud
type: discardcloudservice
resources:
- services
verbs:
- "list"
- "watch"
- user_agent: kubelet
name: masterservice
type: masterservice
resources:
- services
verbs:
- "list"
- "watch"
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
k8s.io/system-validators v1.2.0
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b
sigs.k8s.io/apiserver-network-proxy v0.0.15
sigs.k8s.io/yaml v1.2.0

)

Expand Down
50 changes: 49 additions & 1 deletion pkg/yurtctl/util/edgenode/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,53 @@ metadata:
name: yurt-hub-cfg
namespace: kube-system
data:
cache_agents: ""`
cache_agents: ""
config.yaml: |
serviceTopologyFilter:
- user_agent: coredns
name: coredns
type: servicetopology
resources:
- endpointslices
- endpoints
verbs:
- "list"
- "watch"
- user_agent: kube-proxy
name: servicetopology
type: servicetopology
resources:
- endpointslices
verbs:
- "list"
- "watch"
- user_agent: nginx-ingress-controller
name: ingresscontroller
type: servicetopology
resources:
- endpoints
verbs:
- "list"
- "watch"
- user_agent: kube-proxy
name: discardcloud
type: discardcloudservice
resources:
- services
verbs:
- "list"
- "watch"
- user_agent: kubelet
name: masterservice
type: masterservice
resources:
- services
verbs:
- "list"
- "watch"
`
)
4 changes: 2 additions & 2 deletions pkg/yurthub/filter/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ const (
// End users should add annotation["openyurt.io/skip-discard"]="true" for LB service.
SkipDiscardServiceAnnotation = "openyurt.io/skip-discard"

// ingresscontroller filter is used to reassemble endpoints in order to make the data traffic be
// endpoint filter is used to reassemble endpoints in order to make the data traffic be
// load balanced only to the nodepool valid endpoints.
IngressControllerFilterName = "ingresscontroller"
EndpointFilterName = "endpoint"
)

// DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode.
Expand Down
11 changes: 2 additions & 9 deletions pkg/yurthub/filter/discardcloudservice/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,9 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
)

// Register registers a filter
func Register(filters *filter.Filters) {
filters.Register(filter.DiscardCloudServiceFilterName, func() (filter.Interface, error) {
return NewFilter(), nil
})
}

func NewFilter() *discardCloudServiceFilter {
func NewFilter(comp, resource string, verbs []string) *discardCloudServiceFilter {
return &discardCloudServiceFilter{
Approver: filter.NewApprover("kube-proxy", "services", []string{"list", "watch"}...),
Approver: filter.NewApprover(comp, resource, verbs...),
}
}

Expand Down
Loading

0 comments on commit c3878c9

Please sign in to comment.