Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: service topology filter can not work when hub agent work on cloud mode #607

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 85 additions & 18 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
yurtcorev1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1"
yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned"
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions"
yurtv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions/apps/v1alpha1"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
)
Expand Down Expand Up @@ -75,12 +81,11 @@ type YurtHubConfiguration struct {
SerializerManager *serializer.SerializerManager
RESTMapperManager *meta.RESTMapperManager
TLSConfig *tls.Config
MutatedMasterServiceAddr string
Filters *filter.Filters
SharedFactory informers.SharedInformerFactory
YurtSharedFactory yurtinformers.SharedInformerFactory
WorkingMode util.WorkingMode
KubeletHealthGracePeriod time.Duration
FilterChain filter.Interface
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand All @@ -104,12 +109,11 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
proxySecureServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubProxySecurePort)
proxyServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxyPort)
proxySecureServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxySecurePort)
sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr), options.NodePoolName)
if err != nil {
return nil, err
}
workingMode := util.WorkingMode(options.WorkingMode)

var filterChain filter.Interface
var filters *filter.Filters
var serviceTopologyFilterEnabled bool
var mutatedMasterServiceAddr string
if options.EnableResourceFilter {
if options.WorkingMode == string(util.WorkingModeCloud) {
Expand All @@ -118,6 +122,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
filters = filter.NewFilters(options.DisabledResourceFilters)
registerAllFilters(filters)

serviceTopologyFilterEnabled = filters.Enabled(filter.ServiceTopologyFilterName)
mutatedMasterServiceAddr = us[0].Host
if options.AccessServerThroughHub {
if options.EnableDummyIf {
Expand All @@ -128,6 +133,16 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
}
}

sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr))
if err != nil {
return nil, err
}
registerInformers(sharedFactory, yurtSharedFactory, workingMode, serviceTopologyFilterEnabled, options.NodePoolName, options.NodeName)
filterChain, err = createFilterChain(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, workingMode, options.NodeName, mutatedMasterServiceAddr)
if err != nil {
return nil, err
}

cfg := &YurtHubConfiguration{
LBMode: options.LBMode,
RemoteServers: us,
Expand All @@ -151,15 +166,14 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
EnableDummyIf: options.EnableDummyIf,
EnableIptables: options.EnableIptables,
HubAgentDummyIfName: options.HubAgentDummyIfName,
WorkingMode: util.WorkingMode(options.WorkingMode),
WorkingMode: workingMode,
StorageWrapper: storageWrapper,
SerializerManager: serializerManager,
RESTMapperManager: restMapperManager,
MutatedMasterServiceAddr: mutatedMasterServiceAddr,
Filters: filters,
SharedFactory: sharedFactory,
YurtSharedFactory: yurtSharedFactory,
KubeletHealthGracePeriod: options.KubeletHealthGracePeriod,
FilterChain: filterChain,
}

return cfg, nil
Expand Down Expand Up @@ -196,7 +210,7 @@ func parseRemoteServers(serverAddr string) ([]*url.URL, error) {
}

// createSharedInformers create sharedInformers from the given proxyAddr.
func createSharedInformers(proxyAddr, nodePoolName string) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) {
func createSharedInformers(proxyAddr string) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) {
var kubeConfig *rest.Config
var err error
kubeConfig, err = clientcmd.BuildConfigFromFlags(proxyAddr, "")
Expand All @@ -214,20 +228,73 @@ func createSharedInformers(proxyAddr, nodePoolName string) (informers.SharedInfo
return nil, nil, err
}

if len(nodePoolName) == 0 {
return informers.NewSharedInformerFactory(client, 24*time.Hour), yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil
return informers.NewSharedInformerFactory(client, 24*time.Hour),
yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil
}

// registerInformers reconstruct node/nodePool/configmap informers
func registerInformers(informerFactory informers.SharedInformerFactory,
yurtInformerFactory yurtinformers.SharedInformerFactory,
workingMode util.WorkingMode,
serviceTopologyFilterEnabled bool,
nodePoolName, nodeName string) {
// skip construct node/nodePool informers if service topology filter disabled
if serviceTopologyFilterEnabled {
if workingMode == util.WorkingModeCloud {
newNodeInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": nodeName}.String()
}
return coreinformers.NewFilteredNodeInformer(client, resyncPeriod, nil, tweakListOptions)
}
informerFactory.InformerFor(&corev1.Node{}, newNodeInformer)
}

if len(nodePoolName) != 0 {
newNodePoolInformer := func(client yurtclientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": nodePoolName}.String()
}
return yurtv1alpha1.NewFilteredNodePoolInformer(client, resyncPeriod, nil, tweakListOptions)
}

yurtInformerFactory.InformerFor(&yurtcorev1alpha1.NodePool{}, newNodePoolInformer)
}
}

if workingMode == util.WorkingModeEdge {
newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
}
return coreinformers.NewFilteredConfigMapInformer(client, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
}
informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
}
yurtSharedInformerFactory := yurtinformers.NewSharedInformerFactoryWithOptions(yurtClient, 24*time.Hour,
yurtinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": nodePoolName}.String()
}))
return informers.NewSharedInformerFactory(client, 24*time.Hour), yurtSharedInformerFactory, nil
}

// registerAllFilters by order, the front registered filter will be
// called before the later registered ones.
// called before the behind registered ones.
func registerAllFilters(filters *filter.Filters) {
servicetopology.Register(filters)
masterservice.Register(filters)
discardcloudservice.Register(filters)
}

// createFilterChain return union filters that initializations completed.
func createFilterChain(filters *filter.Filters,
sharedFactory informers.SharedInformerFactory,
yurtSharedFactory yurtinformers.SharedInformerFactory,
serializerManager *serializer.SerializerManager,
storageWrapper cachemanager.StorageWrapper,
workingMode util.WorkingMode,
nodeName, mutatedMasterServiceAddr string) (filter.Interface, error) {
if filters == nil {
return nil, nil
}

genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, nodeName, mutatedMasterServiceAddr, workingMode)
initializerChain := filter.FilterInitializers{}
initializerChain = append(initializerChain, genericInitializer)
return filters.NewFromFilters(initializerChain)
}
30 changes: 4 additions & 26 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/certificate"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/kubelet"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
Expand Down Expand Up @@ -158,15 +156,8 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}
trace++

klog.Infof("%d. new filter chain for mutating response body", trace)
filterChain, err := createFilterChain(cfg)
if err != nil {
return fmt.Errorf("could not new filter chain, %v", err)
}
trace++

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, filterChain, stopCh)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, stopCh)
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %v", err)
}
Expand All @@ -183,11 +174,9 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
klog.Infof("%d. new %s server and begin to serve, dummy proxy server: %s, secure dummy proxy server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerDummyAddr, cfg.YurtHubProxyServerSecureDummyAddr)
}

// start shared informers here
if filterChain != nil && cfg.Filters.Enabled(filter.ServiceTopologyFilterName) {
cfg.SharedFactory.Start(stopCh)
cfg.YurtSharedFactory.Start(stopCh)
}
// start shared informers before start hub server
cfg.SharedFactory.Start(stopCh)
cfg.YurtSharedFactory.Start(stopCh)

klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr)
s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler)
Expand All @@ -198,14 +187,3 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
klog.Infof("hub agent exited")
return nil
}

func createFilterChain(cfg *config.YurtHubConfiguration) (filter.Interface, error) {
if cfg.Filters == nil {
return nil, nil
}

genericInitializer := initializer.New(cfg.SharedFactory, cfg.YurtSharedFactory, cfg.SerializerManager, cfg.StorageWrapper, cfg.NodeName, cfg.MutatedMasterServiceAddr)
initializerChain := filter.FilterInitializers{}
initializerChain = append(initializerChain, genericInitializer)
return cfg.Filters.NewFromFilters(initializerChain)
}
92 changes: 33 additions & 59 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,24 @@ package cachemanager

import (
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var (
const (
sepForAgent = ","
)

func (cm *cacheManager) initCacheAgents() error {
if cm.sharedFactory == nil {
return nil
}
configmapInformer := cm.sharedFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
configmapInformer := cm.sharedFactory.Core().V1().ConfigMaps().Informer()
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cm.addConfigmap,
UpdateFunc: cm.updateConfigmap,
Expand All @@ -50,8 +45,37 @@ func (cm *cacheManager) initCacheAgents() error {
return nil
}

// UpdateCacheAgents update cache agents
func (cm *cacheManager) UpdateCacheAgents(cacheAgents, action string) sets.String {
func (cm *cacheManager) addConfigmap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

deletedAgents := cm.updateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) {
oldCfg, ok := oldObj.(*corev1.ConfigMap)
if !ok {
return
}

newCfg, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}

if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] {
return
}

deletedAgents := cm.updateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update")
cm.deleteAgentCache(deletedAgents)
}

// updateCacheAgents update cache agents
func (cm *cacheManager) updateCacheAgents(cacheAgents, action string) sets.String {
newAgents := sets.NewString()
for _, agent := range strings.Split(cacheAgents, sepForAgent) {
agent = strings.TrimSpace(agent)
Expand All @@ -63,7 +87,6 @@ func (cm *cacheManager) UpdateCacheAgents(cacheAgents, action string) sets.Strin
cm.Lock()
defer cm.Unlock()
cm.cacheAgents = cm.cacheAgents.Delete(util.DefaultCacheAgents...)

if cm.cacheAgents.Equal(newAgents) {
// add default cache agents
cm.cacheAgents = cm.cacheAgents.Insert(util.DefaultCacheAgents...)
Expand All @@ -84,55 +107,6 @@ func (cm *cacheManager) UpdateCacheAgents(cacheAgents, action string) sets.Strin
return deletedAgents
}

// ListCacheAgents get all of cache agents
func (cm *cacheManager) ListCacheAgents() []string {
cm.RLock()
defer cm.RUnlock()
agents := make([]string, 0)
for k := range cm.cacheAgents {
agents = append(agents, k)
}
return agents
}

func newConfigmapInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
}

return coreinformers.NewFilteredConfigMapInformer(cs, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
}

func (cm *cacheManager) addConfigmap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

deletedAgents := cm.UpdateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) {
oldCfg, ok := oldObj.(*corev1.ConfigMap)
if !ok {
return
}

newCfg, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}

if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] {
return
}

deletedAgents := cm.UpdateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) deleteAgentCache(deletedAgents sets.String) {
// delete cache data for deleted agents
if deletedAgents.Len() > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestUpdateCacheAgents(t *testing.T) {
}

// add agents
deletedAgents := m.UpdateCacheAgents(tt.cacheAgents, "")
deletedAgents := m.updateCacheAgents(tt.cacheAgents, "")

if !deletedAgents.Equal(tt.deletedAgents) {
t.Errorf("Got deleted agents: %v, expect agents: %v", deletedAgents, tt.deletedAgents)
Expand Down
Loading