Skip to content

Commit

Permalink
Merge pull request #3527 from ikaven1024/proxy
Browse files Browse the repository at this point in the history
Support namespace filter in RR for search proxy
  • Loading branch information
karmada-bot authored May 22, 2023
2 parents 3151f02 + 67351f4 commit 6089fa3
Show file tree
Hide file tree
Showing 11 changed files with 1,095 additions and 45 deletions.
18 changes: 12 additions & 6 deletions pkg/search/proxy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,22 @@ func (ctl *Controller) reconcile(util.QueueKey) error {
return err
}

resourcesByClusters := make(map[string]map[schema.GroupVersionResource]struct{})
resourcesByClusters := make(map[string]map[schema.GroupVersionResource]*store.MultiNamespace)
for _, registry := range registries {
matchedResources := make(map[schema.GroupVersionResource]struct{}, len(registry.Spec.ResourceSelectors))
matchedResources := make(map[schema.GroupVersionResource]*store.MultiNamespace, len(registry.Spec.ResourceSelectors))
for _, selector := range registry.Spec.ResourceSelectors {
gvr, err := restmapper.GetGroupVersionResource(ctl.restMapper, schema.FromAPIVersionAndKind(selector.APIVersion, selector.Kind))
if err != nil {
klog.Errorf("Failed to get gvr: %v", err)
continue
}
matchedResources[gvr] = struct{}{}

nsSelector := matchedResources[gvr]
if nsSelector == nil {
nsSelector = store.NewMultiNamespace()
matchedResources[gvr] = nsSelector
}
nsSelector.Add(selector.Namespace)
}

if len(matchedResources) == 0 {
Expand All @@ -197,11 +203,11 @@ func (ctl *Controller) reconcile(util.QueueKey) error {
}

if _, exist := resourcesByClusters[cluster.Name]; !exist {
resourcesByClusters[cluster.Name] = make(map[schema.GroupVersionResource]struct{})
resourcesByClusters[cluster.Name] = make(map[schema.GroupVersionResource]*store.MultiNamespace)
}

for resource := range matchedResources {
resourcesByClusters[cluster.Name][resource] = struct{}{}
for resource, multiNS := range matchedResources {
resourcesByClusters[cluster.Name][resource] = multiNS
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/search/proxy/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
"github.com/karmada-io/karmada/pkg/search/proxy/framework"
pluginruntime "github.com/karmada-io/karmada/pkg/search/proxy/framework/runtime"
"github.com/karmada-io/karmada/pkg/search/proxy/store"
proxytest "github.com/karmada-io/karmada/pkg/search/proxy/testing"
"github.com/karmada-io/karmada/pkg/util"
)
Expand Down Expand Up @@ -276,7 +277,7 @@ func TestController_reconcile(t *testing.T) {
clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
store: &proxytest.MockStore{
UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]struct{}) error {
UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error {
for clusterName, resources := range m {
resourceNames := make([]string, 0, len(resources))
for resource := range resources {
Expand Down
16 changes: 11 additions & 5 deletions pkg/search/proxy/store/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
Expand All @@ -28,21 +29,21 @@ func newClusterCache(clusterName string, newClientFunc func() (dynamic.Interface
}
}

func (c *clusterCache) updateCache(resources map[schema.GroupVersionResource]struct{}) error {
func (c *clusterCache) updateCache(resources map[schema.GroupVersionResource]*MultiNamespace) error {
c.lock.Lock()
defer c.lock.Unlock()

// remove non-exist resources
for resource := range c.cache {
if _, exist := resources[resource]; !exist {
for resource, cache := range c.cache {
if multiNS, exist := resources[resource]; !exist || !multiNS.Equal(cache.multiNS) {
klog.Infof("Remove cache for %s %s", c.clusterName, resource.String())
c.cache[resource].stop()
delete(c.cache, resource)
}
}

// add resource cache
for resource := range resources {
for resource, multiNS := range resources {
_, exist := c.cache[resource]
if !exist {
kind, err := c.restMapper.KindFor(resource)
Expand All @@ -55,8 +56,13 @@ func (c *clusterCache) updateCache(resources map[schema.GroupVersionResource]str
}
namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace

if !namespaced && !multiNS.allNamespaces {
klog.Warningf("Namespace is invalid for %v, skip it.", kind.String())
multiNS.Add(metav1.NamespaceAll)
}

klog.Infof("Add cache for %s %s", c.clusterName, resource.String())
cache, err := newResourceCache(c.clusterName, resource, kind, namespaced, c.clientForResourceFunc(resource))
cache, err := newResourceCache(c.clusterName, resource, kind, namespaced, multiNS, c.clientForResourceFunc(resource))
if err != nil {
return err
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/search/proxy/store/multi_cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/dynamic"
Expand All @@ -25,7 +24,7 @@ import (

// Store is the cache for resources from multiple member clusters
type Store interface {
UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error
UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error
HasResource(resource schema.GroupVersionResource) bool
GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
Stop()
Expand Down Expand Up @@ -58,7 +57,7 @@ func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error),
}

// UpdateCache update cache for multi clusters
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]struct{}) error {
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error {
if klog.V(3).Enabled() {
start := time.Now()
defer func() {
Expand All @@ -70,12 +69,8 @@ func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema
defer c.lock.Unlock()

// remove non-exist clusters
newClusterNames := sets.NewString()
for clusterName := range resourcesByCluster {
newClusterNames.Insert(clusterName)
}
for clusterName := range c.cache {
if !newClusterNames.Has(clusterName) {
if _, exist := resourcesByCluster[clusterName]; !exist {
klog.Infof("Remove cache for cluster %s", clusterName)
c.cache[clusterName].stop()
delete(c.cache, clusterName)
Expand Down
Loading

0 comments on commit 6089fa3

Please sign in to comment.