Skip to content

Commit

Permalink
enhancement: add configmap to configure user agents for specify respo…
Browse files Browse the repository at this point in the history
…nse cache.

- background:
At present, only response for requests from fixed user agents(like kube-proxy, flanneld, coredns, kubelet, yurthub, yurt-tunnel-agent) can be cached in local disk by yurthub. if user's pod access kube-apiserver through yurthub, the response can not be cached on local disk for user agents can not be recognized.

- solution:
1. It's not good idea to enable cache all response for requests through yurthub, because some clients only want to access kube-apiserver or some list requests may get large volume data and make pressure to local disk.
2. So we add an configmap named yurt-hub-cfg with `cache_agents` field in kube-system namespace, user can add request user agent in this field.
3. For example, `calico` components want to access kube-apiserver through yurthub and want to use yurthub cache ability, you can configure as following:

```
apiVersion: v1
kind: ConfigMap
metadata:
  name: yurt-hub-cfg
  namespace: kube-system
data:
  cache_agents: "calico"

```
  • Loading branch information
rambohe-ch committed Sep 10, 2021
1 parent 51dc6fb commit 14df997
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 201 deletions.
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
if err != nil {
return fmt.Errorf("could not new cache manager, %v", err)
}
Expand Down
7 changes: 7 additions & 0 deletions config/setup/yurthub-cfg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: yurt-hub-cfg
namespace: kube-system
data:
cache_agents: ""
160 changes: 90 additions & 70 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,94 +18,62 @@ package cachemanager

import (
"strings"

"github.com/openyurtio/openyurt/pkg/projectinfo"

"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var (
defaultCacheAgents = []string{
"kubelet",
"kube-proxy",
"flanneld",
"coredns",
projectinfo.GetAgentName(),
projectinfo.GetHubName(),
}
cacheAgentsKey = "_internal/cache-manager/cache-agent.conf"
sepForAgent = ","
sepForAgent = ","
)

func (cm *cacheManager) initCacheAgents() error {
agents := make([]string, 0)
b, err := cm.storage.GetRaw(cacheAgentsKey)
if err == nil && len(b) != 0 {
localAgents := strings.Split(string(b), sepForAgent)
if len(localAgents) < len(defaultCacheAgents) {
err = cm.storage.Delete(cacheAgentsKey)
if err != nil {
klog.Errorf("failed to delete agents cache, %v", err)
return err
}
} else {
agents = append(agents, localAgents...)
for _, agent := range localAgents {
cm.cacheAgents[agent] = false
}
}
}
for _, agent := range defaultCacheAgents {
if cm.cacheAgents == nil {
cm.cacheAgents = make(map[string]bool)
}

if _, ok := cm.cacheAgents[agent]; !ok {
agents = append(agents, agent)
}
cm.cacheAgents[agent] = true
if cm.sharedFactory == nil {
return nil
}
configmapInformer := cm.sharedFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cm.addConfigmap,
UpdateFunc: cm.updateConfigmap,
})

klog.Infof("reset cache agents to %v", agents)
return cm.storage.UpdateRaw(cacheAgentsKey, []byte(strings.Join(agents, sepForAgent)))
klog.Infof("init cache agents to %v", cm.cacheAgents)
return nil
}

// UpdateCacheAgents update cache agents
func (cm *cacheManager) UpdateCacheAgents(agents []string) error {
if len(agents) == 0 {
klog.Infof("no cache agent is set for update")
return nil
}
func (cm *cacheManager) UpdateCacheAgents(cache_agents, action string) sets.String {
userAgents := strings.TrimSpace(cache_agents)
agents := strings.Split(userAgents, sepForAgent)
newAgents := sets.NewString(agents...)

hasUpdated := false
updatedAgents := append(defaultCacheAgents, agents...)
cm.Lock()
defer cm.Unlock()
if len(updatedAgents) != len(cm.cacheAgents) {
hasUpdated = true
} else {
for _, agent := range agents {
if _, ok := cm.cacheAgents[agent]; !ok {
hasUpdated = true
break
}
}
oldAgents := cm.cacheAgents.Delete(util.DefaultCacheAgents...)

if oldAgents.Equal(newAgents) {
return sets.String{}
}

if hasUpdated {
for k, v := range cm.cacheAgents {
if !v {
// not default agent
delete(cm.cacheAgents, k)
}
}
// get deleted agents
deletedAgents := oldAgents.Difference(newAgents)

for _, agent := range agents {
cm.cacheAgents[agent] = false
}
return cm.storage.UpdateRaw(cacheAgentsKey, []byte(strings.Join(updatedAgents, sepForAgent)))
}
return nil
// construct new cache agents
cm.cacheAgents = cm.cacheAgents.Delete(deletedAgents.List()...)
cm.cacheAgents = cm.cacheAgents.Insert(agents...)
klog.Infof("current cache agents after %s are: %v", action, cm.cacheAgents)

// return deleted agents
return deletedAgents
}

// ListCacheAgents get all of cache agents
Expand All @@ -118,3 +86,55 @@ func (cm *cacheManager) ListCacheAgents() []string {
}
return agents
}

func newConfigmapInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
}

return coreinformers.NewFilteredConfigMapInformer(cs, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
}

func (cm *cacheManager) addConfigmap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

deletedAgents := cm.UpdateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) {
oldCfg, ok := oldObj.(*corev1.ConfigMap)
if !ok {
return
}

newCfg, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}

if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] {
return
}

deletedAgents := cm.UpdateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) deleteAgentCache(deletedAgents sets.String) {
// delete cache data for deleted agents
if deletedAgents.Len() > 0 {
keys := deletedAgents.List()
for i := range keys {
if err := cm.storage.DeleteCollection(keys[i]); err != nil {
klog.Errorf("failed to cleanup cache for deleted agent(%s), %v", keys[i], err)
} else {
klog.Infof("cleanup cache for agent(%s) successfully", keys[i])
}
}
}
}
145 changes: 31 additions & 114 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,137 +17,54 @@ limitations under the License.
package cachemanager

import (
"strings"
"testing"

"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
)

func TestInitCacheAgents(t *testing.T) {
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil, nil)

// default cache agents in fake store
b, err := s.GetRaw(cacheAgentsKey)
if err != nil {
t.Fatalf("failed to get agents, %v", err)
}

gotAgents := strings.Split(string(b), sepForAgent)
if ok := compareAgents(gotAgents, defaultCacheAgents); !ok {
t.Errorf("Got agents: %v, expect agents: %v", gotAgents, defaultCacheAgents)
}

if !compareAgents(gotAgents, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents, m.ListCacheAgents())
}

// add agents for next init cache
_ = m.UpdateCacheAgents([]string{"agent1"})

_, _ = NewCacheManager(s, nil, nil)

b2, err := s.GetRaw(cacheAgentsKey)
if err != nil {
t.Fatalf("failed to get agents, %v", err)
}

expectedAgents := append(defaultCacheAgents, "agent1")
gotAgents2 := strings.Split(string(b2), sepForAgent)
if ok := compareAgents(gotAgents2, expectedAgents); !ok {
t.Errorf("Got agents: %v, expect agents: %v", gotAgents2, expectedAgents)
}

if !compareAgents(gotAgents2, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents2, m.ListCacheAgents())
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
}
}

func TestUpdateCacheAgents(t *testing.T) {
dStorage, err := disk.NewDiskStorage(rootDir)
if err != nil {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil, nil)

testcases := map[string]struct {
desc string
addAgents []string
expectAgents []string
desc string
initAgents []string
cacheAgents string
resultAgents sets.String
deletedAgents sets.String
}{
"add one agent": {addAgents: []string{"agent1"}, expectAgents: append(defaultCacheAgents, "agent1")},
"update with two agents": {addAgents: []string{"agent2", "agent3"}, expectAgents: append(defaultCacheAgents, "agent2", "agent3")},
"update with more two agents": {addAgents: []string{"agent4", "agent5"}, expectAgents: append(defaultCacheAgents, "agent4", "agent5")},
"two new agents updated": {
initAgents: []string{},
cacheAgents: "agent1,agent2",
resultAgents: sets.NewString([]string{"agent1", "agent2"}...),
deletedAgents: sets.String{},
},
"two new agents updated but an old agent deleted": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent2,agent3",
resultAgents: sets.NewString([]string{"agent2", "agent3"}...),
deletedAgents: sets.NewString("agent1"),
},
"no agents updated ": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent1,agent2",
resultAgents: sets.NewString([]string{"agent1", "agent2"}...),
deletedAgents: sets.String{},
},
}
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {

// add agents
err := m.UpdateCacheAgents(tt.addAgents)
if err != nil {
t.Fatalf("failed to add cache agents, %v", err)
}

b, err := s.GetRaw(cacheAgentsKey)
if err != nil {
t.Fatalf("failed to get agents, %v", err)
m := &cacheManager{
cacheAgents: sets.NewString(tt.initAgents...),
}

gotAgents := strings.Split(string(b), sepForAgent)
if ok := compareAgents(gotAgents, tt.expectAgents); !ok {
t.Errorf("Got agents: %v, expect agents: %v", gotAgents, tt.expectAgents)
}
// add agents
deletedAgents := m.UpdateCacheAgents(tt.cacheAgents, "")

if !compareAgents(gotAgents, m.ListCacheAgents()) {
t.Errorf("Got agents: %v, cache agents map: %v", gotAgents, m.ListCacheAgents())
if !deletedAgents.Equal(tt.deletedAgents) {
t.Errorf("Got deleted agents: %v, expect agents: %v", deletedAgents, tt.deletedAgents)
}

err = s.Delete(cacheAgentsKey)
if err != nil {
t.Errorf("failed to delete cache agents key, %v", err)
if !m.cacheAgents.Equal(tt.resultAgents) {
t.Errorf("Got cache agents: %v, expect agents: %v", m.cacheAgents, tt.resultAgents)
}
})
}
}

func compareAgents(gotAgents []string, expectedAgents []string) bool {
if len(gotAgents) != len(expectedAgents) {
return false
}

for _, agent := range gotAgents {
notFound := true
for i := range expectedAgents {
if expectedAgents[i] == agent {
notFound = false
break
}
}

if notFound {
return false
}
}

return true
}

func newTestRequestInfoResolver() *request.RequestInfoFactory {
return &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
}
Loading

0 comments on commit 14df997

Please sign in to comment.