Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize cache of yurthub #2068

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (

"github.com/openyurtio/openyurt/cmd/yurthub/app/options"
"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate"
certificatemgr "github.com/openyurtio/openyurt/pkg/yurthub/certificate/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
Expand All @@ -55,6 +54,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/wrapper"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

Expand All @@ -70,7 +70,8 @@ type YurtHubConfiguration struct {
HeartbeatIntervalSeconds int
MaxRequestInFlight int
EnableProfiling bool
StorageWrapper cachemanager.StorageWrapper
Queue wrapper.Interface
StorageWrapper wrapper.StorageWrapper
SerializerManager *serializer.SerializerManager
RESTMapperManager *meta.RESTMapperManager
SharedFactory informers.SharedInformerFactory
Expand Down Expand Up @@ -120,7 +121,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
klog.Errorf("could not create storage manager, %v", err)
return nil, err
}
storageWrapper := cachemanager.NewStorageWrapper(storageManager)
queue := wrapper.NewQueueWithOptions()
storageWrapper := wrapper.NewStorageWrapper(storageManager, queue)
serializerManager := serializer.NewSerializerManager()
restMapperManager, err := meta.NewRESTMapperManager(options.DiskCachePath)
if err != nil {
Expand Down Expand Up @@ -153,6 +155,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
MaxRequestInFlight: options.MaxRequestInFlight,
EnableProfiling: options.EnableProfiling,
WorkingMode: workingMode,
Queue: queue,
StorageWrapper: storageWrapper,
SerializerManager: serializerManager,
RESTMapperManager: restMapperManager,
Expand Down
7 changes: 6 additions & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/wrapper"
"github.com/openyurtio/openyurt/pkg/yurthub/tenant"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
Expand Down Expand Up @@ -130,6 +131,10 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
}
trace++

controller := wrapper.NewController(cfg.Queue, cfg.StorageWrapper)
controller.Run(ctx, wrapper.ConcurrentWorkers)
trace++

var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
Expand Down Expand Up @@ -332,7 +337,7 @@ func coordinatorRun(ctx context.Context,
klog.Errorf("coordinator could not create coordinator, %v", err)
return
}
go coor.Run()
go coor.Run(ctx)

coordinatorTransportMgr = coorTransportMgr
coordinatorHealthChecker = coorHealthChecker
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/go-resty/resty/v2 v2.12.0
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/go-cmp v0.5.9
github.com/google/gofuzz v1.2.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-version v1.6.0
Expand All @@ -27,7 +28,6 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/vishvananda/netlink v1.2.1-beta.2
go.etcd.io/etcd/api/v3 v3.5.9
go.etcd.io/etcd/client/pkg/v3 v3.5.9
go.etcd.io/etcd/client/v3 v3.5.9
golang.org/x/net v0.23.0
Expand Down Expand Up @@ -108,7 +108,6 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/cel-go v0.16.1 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
Expand Down Expand Up @@ -138,6 +137,7 @@ require (
github.com/stretchr/objx v0.5.0 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.35.1 // indirect
go.opentelemetry.io/otel v1.10.0 // indirect
Expand Down
11 changes: 8 additions & 3 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/wrapper"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

Expand All @@ -36,10 +38,10 @@ const (
type CacheAgent struct {
sync.Mutex
agents sets.Set[string]
store StorageWrapper
store wrapper.StorageWrapper
}

func NewCacheAgents(informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
func NewCacheAgents(informerFactory informers.SharedInformerFactory, store wrapper.StorageWrapper) *CacheAgent {
ca := &CacheAgent{
agents: sets.New(util.DefaultCacheAgents...),
store: store,
Expand Down Expand Up @@ -130,7 +132,10 @@ func (ca *CacheAgent) deleteAgentCache(deletedAgents sets.Set[string]) {
if deletedAgents.Len() > 0 {
components := deletedAgents.UnsortedList()
for i := range components {
if err := ca.store.DeleteComponentResources(components[i]); err != nil {
key, _ := ca.store.KeyFunc(storage.KeyBuildInfo{
Component: components[i],
})
if err := ca.store.Delete(key); err != nil {
klog.Errorf("could not cleanup cache for deleted agent(%s), %v", components[i], err)
} else {
klog.Infof("cleanup cache for agent(%s) successfully", components[i])
Expand Down
42 changes: 19 additions & 23 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/wrapper"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

Expand All @@ -68,7 +69,7 @@ type CacheManager interface {

type cacheManager struct {
sync.RWMutex
storage StorageWrapper
storage wrapper.StorageWrapper
serializerManager *serializer.SerializerManager
restMapperManager *hubmeta.RESTMapperManager
cacheAgents *CacheAgent
Expand All @@ -78,7 +79,7 @@ type cacheManager struct {

// NewCacheManager creates a new CacheManager
func NewCacheManager(
storagewrapper StorageWrapper,
storagewrapper wrapper.StorageWrapper,
serializerMgr *serializer.SerializerManager,
restMapperMgr *hubmeta.RESTMapperManager,
sharedFactory informers.SharedInformerFactory,
Expand Down Expand Up @@ -336,14 +337,14 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re

comp, _ := util.ClientComponentFrom(ctx)
respContentType, _ := util.RespContentTypeFrom(ctx)
s := cm.serializerManager.CreateSerializer(respContentType, info.APIGroup, info.APIVersion, info.Resource)
if s == nil {
serializer := cm.serializerManager.CreateSerializer(respContentType, info.APIGroup, info.APIVersion, info.Resource)
if serializer == nil {
klog.Errorf("could not create serializer in saveWatchObject, %s", util.ReqInfoString(info))
return fmt.Errorf("could not create serializer in saveWatchObject, %s", util.ReqInfoString(info))
}
accessor := meta.NewAccessor()

d, err := s.WatchDecoder(r)
d, err := serializer.WatchDecoder(r)
if err != nil {
klog.Errorf("saveWatchObject ended with error, %v", err)
return err
Expand Down Expand Up @@ -484,33 +485,28 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
return cm.storeObjectWithKey(key, items[0])
} else {
// list all objects or with fieldselector/labelselector
key, _ := cm.storage.KeyFunc(storage.KeyBuildInfo{
Component: comp,
Resources: info.Resource,
Group: info.APIGroup,
Version: info.APIVersion,
})
objs := make(map[storage.Key]runtime.Object)
comp, _ := util.ClientComponentFrom(ctx)
for i := range items {
accessor.SetKind(items[i], kind)
accessor.SetAPIVersion(items[i], apiVersion)
name, _ := accessor.Name(items[i])
for i := 0; i < len(items); i++ {
ns, _ := accessor.Namespace(items[i])
if ns == "" {
ns = info.Namespace
}

key, _ := cm.storage.KeyFunc(storage.KeyBuildInfo{
name, _ := accessor.Name(items[i])
k, _ := cm.storage.KeyFunc(storage.KeyBuildInfo{
Component: comp,
Namespace: ns,
Name: name,
Resources: info.Resource,
Group: info.APIGroup,
Version: info.APIVersion,
Namespace: ns,
Name: name,
})
objs[key] = items[i]
objs[k] = items[i]
}
// if no objects in cloud cluster(objs is empty), it will clean the old files in the path of rootkey
return cm.storage.ReplaceComponentList(comp, schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
}, info.Namespace, objs)
return cm.storage.Replace(key, objs)
}
}

Expand Down
Loading
Loading