diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index e898062cf8f..3711268b85e 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -126,7 +126,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { trace++ klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace) - cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager) + cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory) if err != nil { return fmt.Errorf("could not new cache manager, %v", err) } diff --git a/config/setup/yurthub-cfg.yaml b/config/setup/yurthub-cfg.yaml new file mode 100644 index 00000000000..6cebdbb34b4 --- /dev/null +++ b/config/setup/yurthub-cfg.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: yurt-hub-cfg + namespace: kube-system +data: + cache_agents: "" diff --git a/pkg/yurthub/cachemanager/cache_agent.go b/pkg/yurthub/cachemanager/cache_agent.go index 69a6fefb950..36e89815c95 100644 --- a/pkg/yurthub/cachemanager/cache_agent.go +++ b/pkg/yurthub/cachemanager/cache_agent.go @@ -18,94 +18,65 @@ package cachemanager import ( "strings" - - "github.com/openyurtio/openyurt/pkg/projectinfo" - + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/sets" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/klog" + + "github.com/openyurtio/openyurt/pkg/yurthub/util" ) var ( - defaultCacheAgents = []string{ - "kubelet", - "kube-proxy", - "flanneld", - "coredns", - projectinfo.GetAgentName(), - projectinfo.GetHubName(), - } - cacheAgentsKey = "_internal/cache-manager/cache-agent.conf" - sepForAgent = "," + sepForAgent = "," ) func (cm *cacheManager) initCacheAgents() error { - agents := make([]string, 0) - b, err := cm.storage.GetRaw(cacheAgentsKey) - if err == nil && len(b) != 0 { - localAgents := strings.Split(string(b), sepForAgent) - if len(localAgents) < len(defaultCacheAgents) { - err = cm.storage.Delete(cacheAgentsKey) - if err != nil { - klog.Errorf("failed to delete agents cache, %v", err) - return err - } - } else { - agents = append(agents, localAgents...) - for _, agent := range localAgents { - cm.cacheAgents[agent] = false - } - } - } - for _, agent := range defaultCacheAgents { - if cm.cacheAgents == nil { - cm.cacheAgents = make(map[string]bool) - } - - if _, ok := cm.cacheAgents[agent]; !ok { - agents = append(agents, agent) - } - cm.cacheAgents[agent] = true + if cm.sharedFactory == nil { + return nil } + configmapInformer := cm.sharedFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer) + configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cm.addConfigmap, + UpdateFunc: cm.updateConfigmap, + }) - klog.Infof("reset cache agents to %v", agents) - return cm.storage.UpdateRaw(cacheAgentsKey, []byte(strings.Join(agents, sepForAgent))) + klog.Infof("init cache agents to %v", cm.cacheAgents) + return nil } // UpdateCacheAgents update cache agents -func (cm *cacheManager) UpdateCacheAgents(agents []string) error { - if len(agents) == 0 { - klog.Infof("no cache agent is set for update") - return nil - } +func (cm *cacheManager) UpdateCacheAgents(cache_agents, action string) sets.String { + userAgents := strings.TrimSpace(cache_agents) + agents := strings.Split(userAgents, sepForAgent) + newAgents := sets.NewString(agents...) - hasUpdated := false - updatedAgents := append(defaultCacheAgents, agents...) cm.Lock() defer cm.Unlock() - if len(updatedAgents) != len(cm.cacheAgents) { - hasUpdated = true - } else { - for _, agent := range agents { - if _, ok := cm.cacheAgents[agent]; !ok { - hasUpdated = true - break - } - } + oldAgents := cm.cacheAgents.Delete(util.DefaultCacheAgents...) + + if oldAgents.Equal(newAgents) { + // add default cache agents + cm.cacheAgents = cm.cacheAgents.Insert(util.DefaultCacheAgents...) + return sets.String{} } - if hasUpdated { - for k, v := range cm.cacheAgents { - if !v { - // not default agent - delete(cm.cacheAgents, k) - } - } + // get deleted agents + deletedAgents := oldAgents.Difference(newAgents) - for _, agent := range agents { - cm.cacheAgents[agent] = false - } - return cm.storage.UpdateRaw(cacheAgentsKey, []byte(strings.Join(updatedAgents, sepForAgent))) - } - return nil + // construct new cache agents + cm.cacheAgents = cm.cacheAgents.Delete(deletedAgents.List()...) + cm.cacheAgents = cm.cacheAgents.Insert(agents...) + cm.cacheAgents = cm.cacheAgents.Insert(util.DefaultCacheAgents...) + klog.Infof("current cache agents after %s are: %v", action, cm.cacheAgents) + + // return deleted agents + return deletedAgents } // ListCacheAgents get all of cache agents @@ -118,3 +89,55 @@ func (cm *cacheManager) ListCacheAgents() []string { } return agents } + +func newConfigmapInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + selector := fields.Set{"metadata.name": util.YurthubConfigMapName}.String() + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = selector + } + + return coreinformers.NewFilteredConfigMapInformer(cs, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions) +} + +func (cm *cacheManager) addConfigmap(obj interface{}) { + cfg, ok := obj.(*corev1.ConfigMap) + if !ok { + return + } + + deletedAgents := cm.UpdateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add") + cm.deleteAgentCache(deletedAgents) +} + +func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) { + oldCfg, ok := oldObj.(*corev1.ConfigMap) + if !ok { + return + } + + newCfg, ok := newObj.(*corev1.ConfigMap) + if !ok { + return + } + + if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] { + return + } + + deletedAgents := cm.UpdateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update") + cm.deleteAgentCache(deletedAgents) +} + +func (cm *cacheManager) deleteAgentCache(deletedAgents sets.String) { + // delete cache data for deleted agents + if deletedAgents.Len() > 0 { + keys := deletedAgents.List() + for i := range keys { + if err := cm.storage.DeleteCollection(keys[i]); err != nil { + klog.Errorf("failed to cleanup cache for deleted agent(%s), %v", keys[i], err) + } else { + klog.Infof("cleanup cache for agent(%s) successfully", keys[i]) + } + } + } +} diff --git a/pkg/yurthub/cachemanager/cache_agent_test.go b/pkg/yurthub/cachemanager/cache_agent_test.go index 09be6c75028..ff38e34991e 100644 --- a/pkg/yurthub/cachemanager/cache_agent_test.go +++ b/pkg/yurthub/cachemanager/cache_agent_test.go @@ -17,137 +17,56 @@ limitations under the License. package cachemanager import ( - "strings" "testing" - "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" + "github.com/openyurtio/openyurt/pkg/yurthub/util" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apiserver/pkg/endpoints/request" ) -func TestInitCacheAgents(t *testing.T) { - dStorage, err := disk.NewDiskStorage(rootDir) - if err != nil { - t.Errorf("failed to create disk storage, %v", err) - } - s := NewStorageWrapper(dStorage) - m, _ := NewCacheManager(s, nil, nil) - - // default cache agents in fake store - b, err := s.GetRaw(cacheAgentsKey) - if err != nil { - t.Fatalf("failed to get agents, %v", err) - } - - gotAgents := strings.Split(string(b), sepForAgent) - if ok := compareAgents(gotAgents, defaultCacheAgents); !ok { - t.Errorf("Got agents: %v, expect agents: %v", gotAgents, defaultCacheAgents) - } - - if !compareAgents(gotAgents, m.ListCacheAgents()) { - t.Errorf("Got agents: %v, cache agents map: %v", gotAgents, m.ListCacheAgents()) - } - - // add agents for next init cache - _ = m.UpdateCacheAgents([]string{"agent1"}) - - _, _ = NewCacheManager(s, nil, nil) - - b2, err := s.GetRaw(cacheAgentsKey) - if err != nil { - t.Fatalf("failed to get agents, %v", err) - } - - expectedAgents := append(defaultCacheAgents, "agent1") - gotAgents2 := strings.Split(string(b2), sepForAgent) - if ok := compareAgents(gotAgents2, expectedAgents); !ok { - t.Errorf("Got agents: %v, expect agents: %v", gotAgents2, expectedAgents) - } - - if !compareAgents(gotAgents2, m.ListCacheAgents()) { - t.Errorf("Got agents: %v, cache agents map: %v", gotAgents2, m.ListCacheAgents()) - } - - err = s.Delete(cacheAgentsKey) - if err != nil { - t.Errorf("failed to delete cache agents key, %v", err) - } -} - func TestUpdateCacheAgents(t *testing.T) { - dStorage, err := disk.NewDiskStorage(rootDir) - if err != nil { - t.Errorf("failed to create disk storage, %v", err) - } - s := NewStorageWrapper(dStorage) - m, _ := NewCacheManager(s, nil, nil) - testcases := map[string]struct { - desc string - addAgents []string - expectAgents []string + desc string + initAgents []string + cacheAgents string + resultAgents sets.String + deletedAgents sets.String }{ - "add one agent": {addAgents: []string{"agent1"}, expectAgents: append(defaultCacheAgents, "agent1")}, - "update with two agents": {addAgents: []string{"agent2", "agent3"}, expectAgents: append(defaultCacheAgents, "agent2", "agent3")}, - "update with more two agents": {addAgents: []string{"agent4", "agent5"}, expectAgents: append(defaultCacheAgents, "agent4", "agent5")}, + "two new agents updated": { + initAgents: []string{}, + cacheAgents: "agent1,agent2", + resultAgents: sets.NewString(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...), + deletedAgents: sets.String{}, + }, + "two new agents updated but an old agent deleted": { + initAgents: []string{"agent1", "agent2"}, + cacheAgents: "agent2,agent3", + resultAgents: sets.NewString(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...), + deletedAgents: sets.NewString("agent1"), + }, + "no agents updated ": { + initAgents: []string{"agent1", "agent2"}, + cacheAgents: "agent1,agent2", + resultAgents: sets.NewString(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...), + deletedAgents: sets.String{}, + }, } for k, tt := range testcases { t.Run(k, func(t *testing.T) { - - // add agents - err := m.UpdateCacheAgents(tt.addAgents) - if err != nil { - t.Fatalf("failed to add cache agents, %v", err) - } - - b, err := s.GetRaw(cacheAgentsKey) - if err != nil { - t.Fatalf("failed to get agents, %v", err) + m := &cacheManager{ + cacheAgents: sets.NewString(tt.initAgents...), } - gotAgents := strings.Split(string(b), sepForAgent) - if ok := compareAgents(gotAgents, tt.expectAgents); !ok { - t.Errorf("Got agents: %v, expect agents: %v", gotAgents, tt.expectAgents) - } + // add agents + deletedAgents := m.UpdateCacheAgents(tt.cacheAgents, "") - if !compareAgents(gotAgents, m.ListCacheAgents()) { - t.Errorf("Got agents: %v, cache agents map: %v", gotAgents, m.ListCacheAgents()) + if !deletedAgents.Equal(tt.deletedAgents) { + t.Errorf("Got deleted agents: %v, expect agents: %v", deletedAgents, tt.deletedAgents) } - err = s.Delete(cacheAgentsKey) - if err != nil { - t.Errorf("failed to delete cache agents key, %v", err) + if !m.cacheAgents.Equal(tt.resultAgents) { + t.Errorf("Got cache agents: %v, expect agents: %v", m.cacheAgents, tt.resultAgents) } }) } } - -func compareAgents(gotAgents []string, expectedAgents []string) bool { - if len(gotAgents) != len(expectedAgents) { - return false - } - - for _, agent := range gotAgents { - notFound := true - for i := range expectedAgents { - if expectedAgents[i] == agent { - notFound = false - break - } - } - - if notFound { - return false - } - } - - return true -} - -func newTestRequestInfoResolver() *request.RequestInfoFactory { - return &request.RequestInfoFactory{ - APIPrefixes: sets.NewString("api", "apis"), - GrouplessAPIPrefixes: sets.NewString("api"), - } -} diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 0a4193c661e..eddfbc7048f 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -39,9 +39,11 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/handlers" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog" ) @@ -50,7 +52,7 @@ import ( type CacheManager interface { CacheResponse(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) error QueryCache(req *http.Request) (runtime.Object, error) - UpdateCacheAgents(agents []string) error + UpdateCacheAgents(agents, action string) sets.String ListCacheAgents() []string CanCacheFor(req *http.Request) bool DeleteKindFor(gvr schema.GroupVersionResource) error @@ -61,8 +63,9 @@ type cacheManager struct { storage StorageWrapper serializerManager *serializer.SerializerManager restMapperManager *hubmeta.RESTMapperManager - cacheAgents map[string]bool + cacheAgents sets.String listSelectorCollector map[string]string + sharedFactory informers.SharedInformerFactory } // NewCacheManager creates a new CacheManager @@ -70,13 +73,15 @@ func NewCacheManager( storage StorageWrapper, serializerMgr *serializer.SerializerManager, restMapperMgr *hubmeta.RESTMapperManager, + sharedFactory informers.SharedInformerFactory, ) (CacheManager, error) { cm := &cacheManager{ storage: storage, serializerManager: serializerMgr, restMapperManager: restMapperMgr, - cacheAgents: make(map[string]bool), + cacheAgents: sets.NewString(util.DefaultCacheAgents...), listSelectorCollector: make(map[string]string), + sharedFactory: sharedFactory, } err := cm.initCacheAgents() @@ -567,7 +572,7 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool { // request with Edge-Cache header, continue verification } else { cm.RLock() - if _, found := cm.cacheAgents[comp]; !found { + if !cm.cacheAgents.Has(comp) { cm.RUnlock() return false } diff --git a/pkg/yurthub/cachemanager/cache_manager_test.go b/pkg/yurthub/cachemanager/cache_manager_test.go index 8870b1877d6..3ddb12beb8d 100644 --- a/pkg/yurthub/cachemanager/cache_manager_test.go +++ b/pkg/yurthub/cachemanager/cache_manager_test.go @@ -43,8 +43,10 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/filters" + "k8s.io/apiserver/pkg/endpoints/request" ) var ( @@ -62,7 +64,7 @@ func TestCacheGetResponse(t *testing.T) { yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, - cacheAgents: make(map[string]bool), + cacheAgents: sets.String{}, restMapperManager: restRESTMapperMgr, } @@ -550,7 +552,7 @@ func TestCacheWatchResponse(t *testing.T) { yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, - cacheAgents: make(map[string]bool), + cacheAgents: sets.String{}, restMapperManager: restRESTMapperMgr, } @@ -855,7 +857,7 @@ func TestCacheListResponse(t *testing.T) { yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, - cacheAgents: make(map[string]bool), + cacheAgents: sets.String{}, restMapperManager: restRESTMapperMgr, } @@ -1342,7 +1344,7 @@ func TestQueryCacheForGet(t *testing.T) { yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, - cacheAgents: make(map[string]bool), + cacheAgents: sets.String{}, restMapperManager: restRESTMapperMgr, } @@ -1874,7 +1876,7 @@ func TestQueryCacheForList(t *testing.T) { yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, - cacheAgents: make(map[string]bool), + cacheAgents: sets.String{}, restMapperManager: restRESTMapperMgr, } @@ -2353,7 +2355,7 @@ func TestCanCacheFor(t *testing.T) { t.Errorf("failed to create disk storage, %v", err) } s := NewStorageWrapper(dStorage) - m, _ := NewCacheManager(s, nil, nil) + m, _ := NewCacheManager(s, nil, nil, nil) type proxyRequest struct { userAgent string @@ -2607,3 +2609,10 @@ func checkReqCanCache(m CacheManager, userAgent, verb, path string, header map[s return reqCanCache } + +func newTestRequestInfoResolver() *request.RequestInfoFactory { + return &request.RequestInfoFactory{ + APIPrefixes: sets.NewString("api", "apis"), + GrouplessAPIPrefixes: sets.NewString("api"), + } +} diff --git a/pkg/yurthub/proxy/local/local_test.go b/pkg/yurthub/proxy/local/local_test.go index f49766575d7..e82eb126308 100644 --- a/pkg/yurthub/proxy/local/local_test.go +++ b/pkg/yurthub/proxy/local/local_test.go @@ -59,7 +59,7 @@ func TestServeHTTPForWatch(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil, nil) fn := func() bool { return false @@ -147,7 +147,7 @@ func TestServeHTTPForWatchWithHealthyChange(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil, nil) cnt := 0 fn := func() bool { @@ -229,7 +229,7 @@ func TestServeHTTPForPost(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil, nil) fn := func() bool { return false @@ -305,7 +305,7 @@ func TestServeHTTPForDelete(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil, nil) fn := func() bool { return false @@ -368,7 +368,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil, nil) fn := func() bool { return false @@ -504,7 +504,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() restRESTMapperMgr := hubmeta.NewRESTMapperManager(dStorage) - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, restRESTMapperMgr) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, nil) fn := func() bool { return false diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 7d7f74f7035..20215571ca5 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -26,6 +26,7 @@ import ( "path/filepath" "strings" + "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/metrics" @@ -63,6 +64,13 @@ const ( ProxyReqCanCache // ProxyListSelector represents label selector and filed selector string for list request ProxyListSelector + YurtHubNamespace = "kube-system" + CacheUserAgentsKey = "cache_agents" +) + +var ( + DefaultCacheAgents = []string{"kubelet", "kube-proxy", "flanneld", "coredns", projectinfo.GetAgentName(), projectinfo.GetHubName()} + YurthubConfigMapName = fmt.Sprintf("%s-hub-cfg", strings.TrimRightFunc(projectinfo.GetProjectPrefix(), func(c rune) bool { return c == '-' })) ) // WithValue returns a copy of parent in which the value associated with key is val.