diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 937a51b6227..70eb9829241 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -30,6 +30,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice" "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" + "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/storage/factory" yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned" @@ -68,6 +69,7 @@ type YurtHubConfiguration struct { HubAgentDummyIfName string StorageWrapper cachemanager.StorageWrapper SerializerManager *serializer.SerializerManager + RESTMapperManager *meta.RESTMapperManager TLSConfig *tls.Config MutatedMasterServiceAddr string Filters *filter.Filters @@ -89,6 +91,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { } storageWrapper := cachemanager.NewStorageWrapper(storageManager) serializerManager := serializer.NewSerializerManager() + restMapperManager := meta.NewRESTMapperManager(storageManager) hubServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubPort) proxyServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubProxyPort) @@ -141,6 +144,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { HubAgentDummyIfName: options.HubAgentDummyIfName, StorageWrapper: storageWrapper, SerializerManager: serializerManager, + RESTMapperManager: restMapperManager, MutatedMasterServiceAddr: mutatedMasterServiceAddr, Filters: filters, SharedFactory: sharedFactory, diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 255b9e689a3..ab17e5ff3a4 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -132,7 +132,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error { trace++ klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace) - cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager) + cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager) if err != nil { klog.Errorf("could not new cache manager, %v", err) return err diff --git a/pkg/yurthub/cachemanager/cache_agent_test.go b/pkg/yurthub/cachemanager/cache_agent_test.go index ca69a6804b8..09be6c75028 100644 --- a/pkg/yurthub/cachemanager/cache_agent_test.go +++ b/pkg/yurthub/cachemanager/cache_agent_test.go @@ -32,7 +32,7 @@ func TestInitCacheAgents(t *testing.T) { t.Errorf("failed to create disk storage, %v", err) } s := NewStorageWrapper(dStorage) - m, _ := NewCacheManager(s, nil) + m, _ := NewCacheManager(s, nil, nil) // default cache agents in fake store b, err := s.GetRaw(cacheAgentsKey) @@ -52,7 +52,7 @@ func TestInitCacheAgents(t *testing.T) { // add agents for next init cache _ = m.UpdateCacheAgents([]string{"agent1"}) - _, _ = NewCacheManager(s, nil) + _, _ = NewCacheManager(s, nil, nil) b2, err := s.GetRaw(cacheAgentsKey) if err != nil { @@ -81,7 +81,7 @@ func TestUpdateCacheAgents(t *testing.T) { t.Errorf("failed to create disk storage, %v", err) } s := NewStorageWrapper(dStorage) - m, _ := NewCacheManager(s, nil) + m, _ := NewCacheManager(s, nil, nil) testcases := map[string]struct { desc string diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 4399822808b..1dee13d0a19 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -28,6 +28,7 @@ import ( "strings" "sync" + hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/util" @@ -52,12 +53,14 @@ type CacheManager interface { UpdateCacheAgents(agents []string) error ListCacheAgents() []string CanCacheFor(req *http.Request) bool + DeleteKindFor(gvr schema.GroupVersionResource) error } type cacheManager struct { sync.RWMutex storage StorageWrapper serializerManager *serializer.SerializerManager + restMapperManager *hubmeta.RESTMapperManager cacheAgents map[string]bool listSelectorCollector map[string]string } @@ -66,10 +69,12 @@ type cacheManager struct { func NewCacheManager( storage StorageWrapper, serializerMgr *serializer.SerializerManager, + restMapperMgr *hubmeta.RESTMapperManager, ) (CacheManager, error) { cm := &cacheManager{ storage: storage, serializerManager: serializerMgr, + restMapperManager: restMapperMgr, cacheAgents: make(map[string]bool), listSelectorCollector: make(map[string]string), } @@ -148,21 +153,38 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro var gvk schema.GroupVersionKind var kind string + // If the GVR information is not recognized, return 404 not found directly + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + if _, gvk = cm.restMapperManager.KindFor(gvr); gvk.Empty() { + return nil, hubmeta.ErrGVRNotRecognized + } else { + kind = gvk.Kind + } + + // If the GVR information is recognized, return list or empty list objs, err := cm.storage.List(key) if err != nil { - return nil, err - } else if len(objs) == 0 { - gvk, err = serializer.UnsafeDefaultRESTMapper.KindFor(schema.GroupVersionResource{ - Group: info.APIGroup, - Version: info.APIVersion, - Resource: info.Resource, - }) - if err != nil { + if err != storage.ErrStorageNotFound { + return nil, err + } else if isPodKey(key) { + // because at least there will be yurt-hub pod on the node. + // if no pods in cache, maybe all of pods have been deleted by accident, + // if empty object is returned, pods on node will be deleted by kubelet. + // in order to prevent the influence to business, return error here so pods + // will be kept on node. return nil, err } - kind = gvk.Kind - } else { - kind = objs[0].GetObjectKind().GroupVersionKind().Kind + } else if len(objs) != 0 { + // If restMapper's kind and object's kind are inconsistent, use the object's kind + objKind := objs[0].GetObjectKind().GroupVersionKind().Kind + if kind != objKind { + klog.Warningf("The restMapper's kind(%v) and object's kind(%v) are inconsistent ", kind, objKind) + kind = objKind + } } var listObj runtime.Object @@ -348,16 +370,13 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req }.String() accessor := meta.NewAccessor() + // Verify if DynamicRESTMapper(which store the CRD info) needs to be updated + if err := cm.restMapperManager.UpdateKind(schema.GroupVersionKind{Group: info.APIGroup, Version: info.APIVersion, Kind: kind}); err != nil { + klog.Errorf("failed to update the DynamicRESTMapper %v", err) + } + comp, _ := util.ClientComponentFrom(ctx) - // even if no objects in cloud cluster, we need to - // make up a storage that represents the no resources - // in local disk, so when cloud-edge network disconnected, - // yurthub can return empty objects instead of 404 code(not found) - if len(items) == 0 { - // list returns no objects - key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, "") - return cm.storage.Create(key, nil) - } else if info.Name != "" && len(items) == 1 { + if info.Name != "" && len(items) == 1 { // list with fieldSelector=metadata.name=xxx accessor.SetKind(items[0], kind) accessor.SetAPIVersion(items[0], apiVersion) @@ -390,7 +409,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req key, _ := util.KeyFunc(comp, info.Resource, ns, name) objs[key] = items[i] } - + // if no objects in cloud cluster(objs is empty), it will clean the old files in the path of rootkey return cm.storage.Replace(rootKey, objs) } } @@ -436,6 +455,12 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ return err } + // Verify if DynamicRESTMapper(which store the CRD info) needs to be updated + gvk := obj.GetObjectKind().GroupVersionKind() + if err := cm.restMapperManager.UpdateKind(gvk); err != nil { + klog.Errorf("failed to update the DynamicRESTMapper %v", err) + } + if err := cm.saveOneObjectWithValidation(key, obj); err != nil { if err != storage.ErrStorageAccessConflict { return err @@ -600,3 +625,8 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool { } return true } + +// DeleteKindFor is used to delete the invalid Kind(which is not registered in the cloud) +func (cm *cacheManager) DeleteKindFor(gvr schema.GroupVersionResource) error { + return cm.restMapperManager.DeleteKindFor(gvr) +} diff --git a/pkg/yurthub/cachemanager/cache_manager_test.go b/pkg/yurthub/cachemanager/cache_manager_test.go index 257bba9284c..8870b1877d6 100644 --- a/pkg/yurthub/cachemanager/cache_manager_test.go +++ b/pkg/yurthub/cachemanager/cache_manager_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/openyurtio/openyurt/pkg/projectinfo" + hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" proxyutil "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" "github.com/openyurtio/openyurt/pkg/yurthub/storage" @@ -41,6 +42,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/filters" ) @@ -50,17 +52,18 @@ var ( ) func TestCacheGetResponse(t *testing.T) { - //storage := NewFakeStorageWrapper() dStorage, err := disk.NewDiskStorage(rootDir) if err != nil { t.Errorf("failed to create disk storage, %v", err) } + restRESTMapperMgr := hubmeta.NewRESTMapperManager(dStorage) sWrapper := NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, cacheAgents: make(map[string]bool), + restMapperManager: restRESTMapperMgr, } testcases := map[string]struct { @@ -506,6 +509,9 @@ func TestCacheGetResponse(t *testing.T) { if err != nil { t.Errorf("failed to delete collection: kubelet, %v", err) } + if err = yurtCM.restMapperManager.ResetRESTMapper(); err != nil { + t.Errorf("failed to delete cached DynamicRESTMapper, %v", err) + } }) } } @@ -538,12 +544,14 @@ func TestCacheWatchResponse(t *testing.T) { if err != nil { t.Errorf("failed to create disk storage, %v", err) } + restRESTMapperMgr := hubmeta.NewRESTMapperManager(dStorage) sWrapper := NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, cacheAgents: make(map[string]bool), + restMapperManager: restRESTMapperMgr, } testcases := map[string]struct { @@ -828,6 +836,9 @@ func TestCacheWatchResponse(t *testing.T) { if err != nil { t.Errorf("failed to delete collection: kubelet, %v", err) } + if err = yurtCM.restMapperManager.ResetRESTMapper(); err != nil { + t.Errorf("failed to delete cached DynamicRESTMapper, %v", err) + } }) } } @@ -840,10 +851,12 @@ func TestCacheListResponse(t *testing.T) { sWrapper := NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() + restRESTMapperMgr := hubmeta.NewRESTMapperManager(dStorage) yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, cacheAgents: make(map[string]bool), + restMapperManager: restRESTMapperMgr, } testcases := map[string]struct { @@ -861,7 +874,6 @@ func TestCacheListResponse(t *testing.T) { err bool data map[string]struct{} } - cacheErr error }{ "list pods": { group: "", @@ -1098,7 +1110,6 @@ func TestCacheListResponse(t *testing.T) { path: "/api/v1/node", resource: "nodes", namespaced: false, - cacheErr: storage.ErrStorageNotFound, }, //used to test whether custom resource list can be cached correctly "cache response for list crontabs": { @@ -1213,6 +1224,37 @@ func TestCacheListResponse(t *testing.T) { }, }, }, + "list foos with no objects": { + group: "samplecontroller.k8s.io", + version: "v1", + key: "kubelet/foos", + inputObj: runtime.Object( + &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "apiVersion": "samplecontroller.k8s.io/v1", + "kind": "FooList", + "metadata": map[string]interface{}{ + "continue": "", + "resourceVersion": "2", + "selfLink": "/apis/samplecontroller.k8s.io/v1/foos", + }, + }, + Items: []unstructured.Unstructured{}, + }, + ), + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos", + resource: "foos", + namespaced: false, + expectResult: struct { + err bool + data map[string]struct{} + }{ + data: map[string]struct{}{}, + }, + }, } resolver := newTestRequestInfoResolver() @@ -1264,8 +1306,13 @@ func TestCacheListResponse(t *testing.T) { objs, err := sWrapper.List(tt.key) if err != nil { - if err != tt.cacheErr { - t.Errorf("expect error %v, but got %v", tt.cacheErr, err) + // If error is storage.ErrStorageNotFound, it means that no object is cached in the hard disk + if err == storage.ErrStorageNotFound { + if len(tt.expectResult.data) != 0 { + t.Errorf("expect %v objects, but get nothing.", len(tt.expectResult.data)) + } + } else { + t.Errorf("got unexpected error %v", err) } } @@ -1277,6 +1324,9 @@ func TestCacheListResponse(t *testing.T) { if err != nil { t.Errorf("failed to delete collection: kubelet, %v", err) } + if err = yurtCM.restMapperManager.ResetRESTMapper(); err != nil { + t.Errorf("failed to delete cached DynamicRESTMapper, %v", err) + } }) } } @@ -1288,10 +1338,12 @@ func TestQueryCacheForGet(t *testing.T) { } sWrapper := NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() + restRESTMapperMgr := hubmeta.NewRESTMapperManager(dStorage) yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, cacheAgents: make(map[string]bool), + restMapperManager: restRESTMapperMgr, } testcases := map[string]struct { @@ -1818,15 +1870,18 @@ func TestQueryCacheForList(t *testing.T) { } sWrapper := NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() + restRESTMapperMgr := hubmeta.NewRESTMapperManager(dStorage) yurtCM := &cacheManager{ storage: sWrapper, serializerManager: serializerM, cacheAgents: make(map[string]bool), + restMapperManager: restRESTMapperMgr, } testcases := map[string]struct { keyPrefix string noObjs bool + cachedKind string inputObj []runtime.Object userAgent string accept string @@ -1834,11 +1889,11 @@ func TestQueryCacheForList(t *testing.T) { path string namespaced bool expectResult struct { - err bool - rv string - data map[string]struct{} + err bool + queryErr error + rv string + data map[string]struct{} } - queryErr error }{ "list with no user agent": { accept: "application/json", @@ -1846,9 +1901,10 @@ func TestQueryCacheForList(t *testing.T) { path: "/api/v1/namespaces/default/pods", namespaced: true, expectResult: struct { - err bool - rv string - data map[string]struct{} + err bool + queryErr error + rv string + data map[string]struct{} }{ err: true, }, @@ -1896,9 +1952,10 @@ func TestQueryCacheForList(t *testing.T) { path: "/api/v1/namespaces/default/pods", namespaced: true, expectResult: struct { - err bool - rv string - data map[string]struct{} + err bool + queryErr error + rv string + data map[string]struct{} }{ rv: "5", data: map[string]struct{}{ @@ -1958,9 +2015,10 @@ func TestQueryCacheForList(t *testing.T) { path: "/api/v1/nodes", namespaced: false, expectResult: struct { - err bool - rv string - data map[string]struct{} + err bool + queryErr error + rv string + data map[string]struct{} }{ rv: "12", data: map[string]struct{}{ @@ -1971,10 +2029,46 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, + "list runtimeclass": { + keyPrefix: "kubelet/runtimeclasses", + noObjs: true, + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/apis/node.k8s.io/v1beta1/runtimeclasses", + namespaced: false, + expectResult: struct { + err bool + queryErr error + rv string + data map[string]struct{} + }{ + data: map[string]struct{}{}, + }, + }, + "list pods and no pods in cache": { + keyPrefix: "kubelet/pods", + noObjs: true, + userAgent: "kubelet", + accept: "application/json", + verb: "GET", + path: "/api/v1/pods", + namespaced: false, + expectResult: struct { + err bool + queryErr error + rv string + data map[string]struct{} + }{ + err: true, + queryErr: storage.ErrStorageNotFound, + }, + }, //used to test whether the query local Custom Resource list request can be handled correctly "list crontabs": { - keyPrefix: "kubelet/crontabs/default", + keyPrefix: "kubelet/crontabs/default", + cachedKind: "stable.example.com/v1/CronTab", inputObj: []runtime.Object{ &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -2016,9 +2110,10 @@ func TestQueryCacheForList(t *testing.T) { path: "/apis/stable.example.com/v1/namespaces/default/crontabs", namespaced: true, expectResult: struct { - err bool - rv string - data map[string]struct{} + err bool + queryErr error + rv string + data map[string]struct{} }{ rv: "5", data: map[string]struct{}{ @@ -2029,7 +2124,8 @@ func TestQueryCacheForList(t *testing.T) { }, }, "list foos": { - keyPrefix: "kubelet/foos", + keyPrefix: "kubelet/foos", + cachedKind: "samplecontroller.k8s.io/v1/Foo", inputObj: []runtime.Object{ &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -2068,9 +2164,10 @@ func TestQueryCacheForList(t *testing.T) { path: "/apis/samplecontroller.k8s.io/v1/foos", namespaced: false, expectResult: struct { - err bool - rv string - data map[string]struct{} + err bool + queryErr error + rv string + data map[string]struct{} }{ rv: "5", data: map[string]struct{}{ @@ -2080,38 +2177,39 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, - "list runtimeclass": { - keyPrefix: "kubelet/runtimeclasses", + "list foos with no objs": { + keyPrefix: "kubelet/foos", noObjs: true, + cachedKind: "samplecontroller.k8s.io/v1/Foo", userAgent: "kubelet", accept: "application/json", verb: "GET", - path: "/apis/node.k8s.io/v1beta1/runtimeclasses", + path: "/apis/samplecontroller.k8s.io/v1/foos", namespaced: false, expectResult: struct { - err bool - rv string - data map[string]struct{} + err bool + queryErr error + rv string + data map[string]struct{} }{ data: map[string]struct{}{}, }, }, - "list pods and no pods in cache": { - keyPrefix: "kubelet/pods", - noObjs: true, + "list unregistered resources": { userAgent: "kubelet", accept: "application/json", verb: "GET", - path: "/api/v1/pods", + path: "/apis/sample.k8s.io/v1/abcs", namespaced: false, expectResult: struct { - err bool - rv string - data map[string]struct{} + err bool + queryErr error + rv string + data map[string]struct{} }{ - err: true, + err: true, + queryErr: hubmeta.ErrGVRNotRecognized, }, - queryErr: storage.ErrStorageNotFound, }, "list resources not exist": { userAgent: "kubelet", @@ -2120,13 +2218,13 @@ func TestQueryCacheForList(t *testing.T) { path: "/api/v1/nodes", namespaced: false, expectResult: struct { - err bool - rv string - data map[string]struct{} + err bool + queryErr error + rv string + data map[string]struct{} }{ - err: true, + data: map[string]struct{}{}, }, - queryErr: storage.ErrStorageNotFound, }, } @@ -2140,8 +2238,16 @@ func TestQueryCacheForList(t *testing.T) { _ = sWrapper.Create(key, tt.inputObj[i]) } - if tt.noObjs { - _ = sWrapper.Create(tt.keyPrefix, nil) + // It is used to simulate caching GVK information. If the caching is successful, + // the next process can obtain the correct GVK information when constructing an empty List. + if tt.cachedKind != "" { + info := strings.Split(tt.cachedKind, hubmeta.SepForGVR) + gvk := schema.GroupVersionKind{ + Group: info[0], + Version: info[1], + Kind: info[2], + } + _ = yurtCM.restMapperManager.UpdateKind(gvk) } req, _ := http.NewRequest(tt.verb, tt.path, nil) @@ -2177,8 +2283,8 @@ func TestQueryCacheForList(t *testing.T) { t.Errorf("Got no error, but expect err") } - if tt.queryErr != nil && tt.queryErr != err { - t.Errorf("expect err %v, but got %v", tt.queryErr, err) + if tt.expectResult.queryErr != nil && tt.expectResult.queryErr != err { + t.Errorf("expect err %v, but got %v", tt.expectResult.queryErr, err) } } else { if err != nil { @@ -2197,6 +2303,10 @@ func TestQueryCacheForList(t *testing.T) { if err != nil { t.Errorf("failed to delete collection: kubelet, %v", err) } + + if err = yurtCM.restMapperManager.ResetRESTMapper(); err != nil { + t.Errorf("failed to delete cached DynamicRESTMapper, %v", err) + } }) } } @@ -2243,7 +2353,7 @@ func TestCanCacheFor(t *testing.T) { t.Errorf("failed to create disk storage, %v", err) } s := NewStorageWrapper(dStorage) - m, _ := NewCacheManager(s, nil) + m, _ := NewCacheManager(s, nil, nil) type proxyRequest struct { userAgent string diff --git a/pkg/yurthub/kubernetes/meta/restmapper.go b/pkg/yurthub/kubernetes/meta/restmapper.go new file mode 100644 index 00000000000..226208a8cc4 --- /dev/null +++ b/pkg/yurthub/kubernetes/meta/restmapper.go @@ -0,0 +1,251 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package meta + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + + "github.com/openyurtio/openyurt/pkg/yurthub/storage" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog" +) + +const ( + CacheDynamicRESTMapperKey = "_internal/restmapper/cache-crd-restmapper.conf" + SepForGVR = "/" +) + +var ( + // unsafeSchemeRESTMapper is used to store the mapping relationship between GVK and GVR in scheme + // It is not updatable and is only used for the judgment of scheme resources + unsafeSchemeRESTMapper = NewDefaultRESTMapperFromScheme() + ErrGVRNotRecognized = errors.New("GroupVersionResource is not recognized") +) + +// RESTMapperManager is responsible for managing different kind of RESTMapper +type RESTMapperManager struct { + sync.RWMutex + storage storage.Store + // UnsafeDefaultRESTMapper is used to save the GVK and GVR mapping relationships of built-in resources + unsafeDefaultRESTMapper *meta.DefaultRESTMapper + // dynamicRESTMapper is used to save the GVK and GVR mapping relationships of Custom Resources + dynamicRESTMapper map[schema.GroupVersionResource]schema.GroupVersionKind +} + +func NewDefaultRESTMapperFromScheme() *meta.DefaultRESTMapper { + s := scheme.Scheme + defaultGroupVersions := s.PrioritizedVersionsAllGroups() + mapper := meta.NewDefaultRESTMapper(defaultGroupVersions) + // enumerate all supported versions, get the kinds, and register with the mapper how to address + // our resources. + for _, gv := range defaultGroupVersions { + for kind := range s.KnownTypes(gv) { + // Only need to process non-list resources + if !strings.HasSuffix(kind, "List") { + // Since RESTMapper is only used for mapping GVR to GVK information, + // the scope field is not involved in actual use, + // so all scope are currently set to meta.RESTScopeNamespace + scope := meta.RESTScopeNamespace + mapper.Add(gv.WithKind(kind), scope) + } + } + } + return mapper +} + +func NewRESTMapperManager(storage storage.Store) *RESTMapperManager { + var dm map[schema.GroupVersionResource]schema.GroupVersionKind + // Recover the mapping relationship between GVR and GVK from the hard disk + b, err := storage.Get(CacheDynamicRESTMapperKey) + if err == nil && len(b) != 0 { + dm = unmarshalDynamicRESTMapper(b) + klog.Infof("reset DynamicRESTMapper to %v", dm) + } else { + dm = make(map[schema.GroupVersionResource]schema.GroupVersionKind) + klog.Infof("initialize an empty DynamicRESTMapper") + } + + return &RESTMapperManager{ + unsafeDefaultRESTMapper: unsafeSchemeRESTMapper, + dynamicRESTMapper: dm, + storage: storage, + } +} + +// Obtain gvk according to gvr in dynamicRESTMapper +func (rm *RESTMapperManager) dynamicKindFor(gvr schema.GroupVersionResource) (schema.GroupVersionKind, error) { + hasResource := len(gvr.Resource) > 0 + hasGroup := len(gvr.Group) > 0 + hasVersion := len(gvr.Version) > 0 + + if !hasResource || !hasGroup { + return schema.GroupVersionKind{}, fmt.Errorf("a resource and group must be present, got: %v", gvr) + } + + rm.RLock() + defer rm.RUnlock() + if hasVersion { + // fully qualified. Find the exact match + kind, exists := rm.dynamicRESTMapper[gvr] + if exists { + return kind, nil + } + } else { + requestedGroupResource := gvr.GroupResource() + for currResource, currKind := range rm.dynamicRESTMapper { + if currResource.GroupResource() == requestedGroupResource { + return currKind, nil + } + } + } + return schema.GroupVersionKind{}, fmt.Errorf("no matches for %v", gvr) +} + +// Used to delete the mapping relationship between GVR and GVK in dynamicRESTMapper +func (rm *RESTMapperManager) deleteKind(gvk schema.GroupVersionKind) error { + kindName := strings.TrimSuffix(gvk.Kind, "List") + plural, singular := meta.UnsafeGuessKindToResource(gvk.GroupVersion().WithKind(kindName)) + rm.Lock() + delete(rm.dynamicRESTMapper, plural) + delete(rm.dynamicRESTMapper, singular) + rm.Unlock() + return rm.updateCachedDynamicRESTMapper() +} + +// Used to update local files saved on disk +func (rm *RESTMapperManager) updateCachedDynamicRESTMapper() error { + if rm.storage == nil { + return nil + } + rm.RLock() + d, err := marshalDynamicRESTMapper(rm.dynamicRESTMapper) + rm.RUnlock() + if err != nil { + return err + } + return rm.storage.Update(CacheDynamicRESTMapperKey, d) +} + +// KindFor is used to find GVK based on GVR information. +// 1. return true means the GVR is a built-in resource in shceme. +// 2.1 return false and non-empty GVK means the GVR is custom resource +// 2.2 return false and empty GVK means the GVR is unknown resource. +func (rm *RESTMapperManager) KindFor(gvr schema.GroupVersionResource) (bool, schema.GroupVersionKind) { + gvk, kindErr := rm.unsafeDefaultRESTMapper.KindFor(gvr) + if kindErr != nil { + gvk, kindErr = rm.dynamicKindFor(gvr) + if kindErr != nil { + return false, schema.GroupVersionKind{} + } + return false, gvk + } + return true, gvk +} + +// DeleteKindFor is used to delete the GVK information related to the incoming gvr +func (rm *RESTMapperManager) DeleteKindFor(gvr schema.GroupVersionResource) error { + isScheme, gvk := rm.KindFor(gvr) + if !isScheme && !gvk.Empty() { + return rm.deleteKind(gvk) + } + return nil +} + +// UpdateKind is used to verify and add the GVK and GVR mapping relationships of new Custom Resource +func (rm *RESTMapperManager) UpdateKind(gvk schema.GroupVersionKind) error { + kindName := strings.TrimSuffix(gvk.Kind, "List") + gvk = gvk.GroupVersion().WithKind(kindName) + plural, singular := meta.UnsafeGuessKindToResource(gvk.GroupVersion().WithKind(kindName)) + // If it is not a built-in resource and it is not stored in DynamicRESTMapper, add it to DynamicRESTMapper + isScheme, t := rm.KindFor(singular) + if !isScheme && t.Empty() { + rm.Lock() + rm.dynamicRESTMapper[singular] = gvk + rm.dynamicRESTMapper[plural] = gvk + rm.Unlock() + return rm.updateCachedDynamicRESTMapper() + } + return nil +} + +// ResetRESTMapper is used to clean up all cached GVR/GVK information in DynamicRESTMapper, +// and delete the corresponding file in the disk (cache-crd-restmapper.conf), it should be used carefully. +func (rm *RESTMapperManager) ResetRESTMapper() error { + rm.dynamicRESTMapper = make(map[schema.GroupVersionResource]schema.GroupVersionKind) + err := rm.storage.DeleteCollection(CacheDynamicRESTMapperKey) + if err != nil { + return err + } + return nil +} + +// marshalDynamicRESTMapper converts dynamicRESTMapper to the []byte format, which is used to save data to disk +func marshalDynamicRESTMapper(dynamicRESTMapper map[schema.GroupVersionResource]schema.GroupVersionKind) ([]byte, error) { + cacheMapper := make(map[string]string, len(dynamicRESTMapper)) + for currResource, currKind := range dynamicRESTMapper { + //key: Group/Version/Resource, value: Kind + k := strings.Join([]string{currResource.Group, currResource.Version, currResource.Resource}, SepForGVR) + cacheMapper[k] = currKind.Kind + } + return json.Marshal(cacheMapper) +} + +// unmarshalDynamicRESTMapper converts bytes of data to map[schema.GroupVersionResource]schema.GroupVersionKind format, used to recover data from disk +func unmarshalDynamicRESTMapper(data []byte) map[schema.GroupVersionResource]schema.GroupVersionKind { + dm := make(map[schema.GroupVersionResource]schema.GroupVersionKind) + cacheMapper := make(map[string]string) + err := json.Unmarshal(data, &cacheMapper) + if err != nil { + klog.Errorf("failed to get cached CRDRESTMapper, %v", err) + } + + for gvrString, kindString := range cacheMapper { + localInfo := strings.Split(gvrString, SepForGVR) + if len(localInfo) != 3 { + klog.Errorf("This %s is not standardized, ignore this gvk", gvrString) + continue + } + gvr := schema.GroupVersionResource{ + Group: localInfo[0], + Version: localInfo[1], + Resource: localInfo[2], + } + gvk := schema.GroupVersionKind{ + Group: localInfo[0], + Version: localInfo[1], + Kind: kindString, + } + dm[gvr] = gvk + } + return dm +} + +// IsSchemeResource is used to determine whether gvr is a built-in resource +func IsSchemeResource(gvr schema.GroupVersionResource) bool { + _, kindErr := unsafeSchemeRESTMapper.KindFor(gvr) + if kindErr != nil { + return false + } + return true +} diff --git a/pkg/yurthub/kubernetes/meta/restmapper_test.go b/pkg/yurthub/kubernetes/meta/restmapper_test.go new file mode 100644 index 00000000000..be7a96f2400 --- /dev/null +++ b/pkg/yurthub/kubernetes/meta/restmapper_test.go @@ -0,0 +1,238 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package meta + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var rootDir = "/tmp/restmapper" + +func TestCreateRESTMapperManager(t *testing.T) { + dStorage, err := disk.NewDiskStorage(rootDir) + if err != nil { + t.Errorf("failed to create disk storage, %v", err) + } + defer func() { + if err := os.RemoveAll(rootDir); err != nil { + t.Errorf("Unable to clean up test directory %q: %v", rootDir, err) + } + }() + + // initialize an empty DynamicRESTMapper + yurtHubRESTMapperManager := NewRESTMapperManager(dStorage) + if yurtHubRESTMapperManager.dynamicRESTMapper == nil || len(yurtHubRESTMapperManager.dynamicRESTMapper) != 0 { + t.Errorf("failed to initialize an empty dynamicRESTMapper, %v", err) + } + + // reset yurtHubRESTMapperManager + if err := yurtHubRESTMapperManager.ResetRESTMapper(); err != nil { + t.Fatalf("failed to reset yurtHubRESTMapperManager , %v", err) + } + + // initialize an Non-empty DynamicRESTMapper + // pre-cache the CRD information to the hard disk + cachedDynamicRESTMapper := map[string]string{ + "samplecontroller.k8s.io/v1alpha1/foo": "Foo", + "samplecontroller.k8s.io/v1alpha1/foos": "Foo", + } + d, err := json.Marshal(cachedDynamicRESTMapper) + if err != nil { + t.Errorf("failed to serialize dynamicRESTMapper, %v", err) + } + err = dStorage.Update(CacheDynamicRESTMapperKey, d) + if err != nil { + t.Fatalf("failed to stored dynamicRESTMapper, %v", err) + } + + // Determine whether the restmapper in the memory is the same as the information written to the disk + yurtHubRESTMapperManager = NewRESTMapperManager(dStorage) + // get the CRD information in memory + m := yurtHubRESTMapperManager.dynamicRESTMapper + gotMapper := dynamicRESTMapperToString(m) + + if !compareDynamicRESTMapper(gotMapper, cachedDynamicRESTMapper) { + t.Errorf("Got mapper: %v, expect mapper: %v", gotMapper, cachedDynamicRESTMapper) + } + + if err := yurtHubRESTMapperManager.ResetRESTMapper(); err != nil { + t.Fatalf("failed to reset yurtHubRESTMapperManager , %v", err) + } +} + +func TestUpdateRESTMapper(t *testing.T) { + dStorage, err := disk.NewDiskStorage(rootDir) + if err != nil { + t.Errorf("failed to create disk storage, %v", err) + } + defer func() { + if err := os.RemoveAll(rootDir); err != nil { + t.Errorf("Unable to clean up test directory %q: %v", rootDir, err) + } + }() + yurtHubRESTMapperManager := NewRESTMapperManager(dStorage) + testcases := map[string]struct { + cachedCRD []schema.GroupVersionKind + addCRD schema.GroupVersionKind + deleteCRD schema.GroupVersionResource + expectRESTMapper map[string]string + }{ + "add the first CRD": { + cachedCRD: []schema.GroupVersionKind{}, + addCRD: schema.GroupVersionKind{Group: "samplecontroller.k8s.io", Version: "v1alpha1", Kind: "Foo"}, + expectRESTMapper: map[string]string{ + "samplecontroller.k8s.io/v1alpha1/foo": "Foo", + "samplecontroller.k8s.io/v1alpha1/foos": "Foo", + }, + }, + + "update with another CRD": { + cachedCRD: []schema.GroupVersionKind{{Group: "samplecontroller.k8s.io", Version: "v1alpha1", Kind: "Foo"}}, + addCRD: schema.GroupVersionKind{Group: "stable.example.com", Version: "v1", Kind: "CronTab"}, + expectRESTMapper: map[string]string{ + "samplecontroller.k8s.io/v1alpha1/foo": "Foo", + "samplecontroller.k8s.io/v1alpha1/foos": "Foo", + "stable.example.com/v1/crontab": "CronTab", + "stable.example.com/v1/crontabs": "CronTab", + }, + }, + "delete one CRD": { + cachedCRD: []schema.GroupVersionKind{ + {Group: "samplecontroller.k8s.io", Version: "v1alpha1", Kind: "Foo"}, + {Group: "stable.example.com", Version: "v1", Kind: "CronTab"}, + }, + deleteCRD: schema.GroupVersionResource{Group: "stable.example.com", Version: "v1", Resource: "crontabs"}, + expectRESTMapper: map[string]string{ + "samplecontroller.k8s.io/v1alpha1/foo": "Foo", + "samplecontroller.k8s.io/v1alpha1/foos": "Foo", + }, + }, + } + for k, tt := range testcases { + t.Run(k, func(t *testing.T) { + // initialize the cache CRD + for _, gvk := range tt.cachedCRD { + err := yurtHubRESTMapperManager.UpdateKind(gvk) + if err != nil { + t.Errorf("failed to initialize the restmapper, %v", err) + } + } + // add CRD information + if !tt.addCRD.Empty() { + err := yurtHubRESTMapperManager.UpdateKind(tt.addCRD) + if err != nil { + t.Errorf("failed to add CRD information, %v", err) + } + } else { + // delete CRD information + err := yurtHubRESTMapperManager.DeleteKindFor(tt.deleteCRD) + if err != nil { + t.Errorf("failed to delete CRD information, %v", err) + } + } + + // verify the CRD information in memory + m := yurtHubRESTMapperManager.dynamicRESTMapper + memoryMapper := dynamicRESTMapperToString(m) + + if !compareDynamicRESTMapper(memoryMapper, tt.expectRESTMapper) { + t.Errorf("Got mapper: %v, expect mapper: %v", memoryMapper, tt.expectRESTMapper) + } + + // verify the CRD information in disk + b, err := dStorage.Get(CacheDynamicRESTMapperKey) + if err != nil { + t.Fatalf("failed to get cached CRD information, %v", err) + } + cacheMapper := make(map[string]string) + err = json.Unmarshal(b, &cacheMapper) + if err != nil { + t.Errorf("failed to decode the cached dynamicRESTMapper, %v", err) + } + + if !compareDynamicRESTMapper(cacheMapper, tt.expectRESTMapper) { + t.Errorf("cached mapper: %v, expect mapper: %v", cacheMapper, tt.expectRESTMapper) + } + }) + } + if err := yurtHubRESTMapperManager.ResetRESTMapper(); err != nil { + t.Fatalf("failed to reset yurtHubRESTMapperManager , %v", err) + } +} + +func TestResetRESTMapper(t *testing.T) { + dStorage, err := disk.NewDiskStorage(rootDir) + if err != nil { + t.Errorf("failed to create disk storage, %v", err) + } + defer func() { + if err := os.RemoveAll(rootDir); err != nil { + t.Errorf("Unable to clean up test directory %q: %v", rootDir, err) + } + }() + // initialize the RESTMapperManager + yurtHubRESTMapperManager := NewRESTMapperManager(dStorage) + err = yurtHubRESTMapperManager.UpdateKind(schema.GroupVersionKind{Group: "stable.example.com", Version: "v1", Kind: "CronTab"}) + if err != nil { + t.Errorf("failed to initialize the restmapper, %v", err) + } + + // reset the RESTMapperManager + if err := yurtHubRESTMapperManager.ResetRESTMapper(); err != nil { + t.Errorf("failed to reset the restmapper, %v", err) + } + + // Verify reset result + if len(yurtHubRESTMapperManager.dynamicRESTMapper) != 0 { + t.Error("The cached GVR/GVK information in memory is not cleaned up.") + } else if _, err := os.Stat(filepath.Join(rootDir, CacheDynamicRESTMapperKey)); !os.IsNotExist(err) { + t.Error("The cached GVR/GVK information in disk is not deleted.") + } +} + +func compareDynamicRESTMapper(gotMapper map[string]string, expectedMapper map[string]string) bool { + if len(gotMapper) != len(expectedMapper) { + return false + } + + for gvr, kind := range gotMapper { + k, exists := expectedMapper[gvr] + if !exists || k != kind { + return false + } + } + + return true +} + +func dynamicRESTMapperToString(m map[schema.GroupVersionResource]schema.GroupVersionKind) map[string]string { + resultMapper := make(map[string]string, len(m)) + for currResource, currKind := range m { + //key: Group/Version/Resource, value: Kind + k := strings.Join([]string{currResource.Group, currResource.Version, currResource.Resource}, SepForGVR) + resultMapper[k] = currKind.Kind + } + return resultMapper +} diff --git a/pkg/yurthub/kubernetes/serializer/serializer.go b/pkg/yurthub/kubernetes/serializer/serializer.go index 3ad07c044c6..a37f35e9e86 100644 --- a/pkg/yurthub/kubernetes/serializer/serializer.go +++ b/pkg/yurthub/kubernetes/serializer/serializer.go @@ -24,7 +24,7 @@ import ( "strings" "sync" - "k8s.io/apimachinery/pkg/api/meta" + hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme" @@ -42,26 +42,6 @@ import ( // YurtHubSerializer is a global serializer manager for yurthub var YurtHubSerializer = NewSerializerManager() -// UnsafeDefaultRESTMapper is only used to check whether the GVK is in the scheme according to the GVR information -var UnsafeDefaultRESTMapper = NewDefaultRESTMapperFromScheme() - -func NewDefaultRESTMapperFromScheme() *meta.DefaultRESTMapper { - s := scheme.Scheme - defaultGroupVersions := s.PrioritizedVersionsAllGroups() - mapper := meta.NewDefaultRESTMapper(defaultGroupVersions) - // enumerate all supported versions, get the kinds, and register with the mapper how to address - // our resources. - for _, gv := range defaultGroupVersions { - for kind := range s.KnownTypes(gv) { - //Since RESTMapper is only used for mapping GVR to GVK information, - //the scope field is not involved in actual use, so all scope are currently set to meta.RESTScopeNamespace - scope := meta.RESTScopeNamespace - mapper.Add(gv.WithKind(kind), scope) - } - } - return mapper -} - type yurtClientNegotiator struct { recognized bool runtime.ClientNegotiator @@ -99,11 +79,9 @@ func NewSerializerManager() *SerializerManager { // GetNegotiatedSerializer returns an NegotiatedSerializer object based on GroupVersionResource func (sm *SerializerManager) GetNegotiatedSerializer(gvr schema.GroupVersionResource) runtime.NegotiatedSerializer { - _, kindErr := UnsafeDefaultRESTMapper.KindFor(gvr) - if kindErr == nil { + if isScheme := hubmeta.IsSchemeResource(gvr); isScheme { return sm.NegotiatedSerializer } - return sm.UnstructuredNegotiatedSerializer } @@ -183,7 +161,7 @@ func (s UnstructuredNegotiatedSerializer) SupportedMediaTypes() []runtime.Serial //EncoderForVersion do nothing, but returns a encoder, //if the object is unstructured, the encoder will encode object without conversion -func (s UnstructuredNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder { +func (s UnstructuredNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, _ runtime.GroupVersioner) runtime.Encoder { return encoder } @@ -215,13 +193,11 @@ func (c unstructuredCreator) New(kind schema.GroupVersionKind) (runtime.Object, // genClientNegotiator creates a ClientNegotiator for specified GroupVersionResource and gvr is recognized or not func (sm *SerializerManager) genClientNegotiator(gvr schema.GroupVersionResource) (runtime.ClientNegotiator, bool) { - _, kindErr := UnsafeDefaultRESTMapper.KindFor(gvr) - if kindErr == nil { + if isScheme := hubmeta.IsSchemeResource(gvr); isScheme { return runtime.NewClientNegotiator(sm.NegotiatedSerializer, gvr.GroupVersion()), true } klog.Infof("%#+v is not found in client-go runtime scheme", gvr) return runtime.NewClientNegotiator(sm.UnstructuredNegotiatedSerializer, gvr.GroupVersion()), false - } // Serializer is used for transforming objects into a serialized format and back for cache manager of hub agent. @@ -287,7 +263,6 @@ func (s *Serializer) Decode(b []byte) (runtime.Object, error) { if err != nil { return nil, err } - return out, nil } diff --git a/pkg/yurthub/proxy/local/local.go b/pkg/yurthub/proxy/local/local.go index bd67b87881b..328d2a86545 100644 --- a/pkg/yurthub/proxy/local/local.go +++ b/pkg/yurthub/proxy/local/local.go @@ -25,6 +25,7 @@ import ( "time" manager "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/util" @@ -204,7 +205,7 @@ func (lp *LocalProxy) localReqCache(w http.ResponseWriter, req *http.Request) er } obj, err := lp.cacheMgr.QueryCache(req) - if err == storage.ErrStorageNotFound { + if err == storage.ErrStorageNotFound || err == hubmeta.ErrGVRNotRecognized { klog.Errorf("object not found for %s", util.ReqString(req)) reqInfo, _ := apirequest.RequestInfoFrom(req.Context()) return errors.NewNotFound(schema.GroupResource{Group: reqInfo.APIGroup, Resource: reqInfo.Resource}, reqInfo.Name) diff --git a/pkg/yurthub/proxy/local/local_test.go b/pkg/yurthub/proxy/local/local_test.go index 868bcb80e5a..f49766575d7 100644 --- a/pkg/yurthub/proxy/local/local_test.go +++ b/pkg/yurthub/proxy/local/local_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" + hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" proxyutil "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" @@ -34,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/filters" "k8s.io/apiserver/pkg/endpoints/request" @@ -57,7 +59,7 @@ func TestServeHTTPForWatch(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) fn := func() bool { return false @@ -145,7 +147,7 @@ func TestServeHTTPForWatchWithHealthyChange(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) cnt := 0 fn := func() bool { @@ -227,7 +229,7 @@ func TestServeHTTPForPost(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) fn := func() bool { return false @@ -303,7 +305,7 @@ func TestServeHTTPForDelete(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) fn := func() bool { return false @@ -366,7 +368,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, nil) fn := func() bool { return false @@ -501,7 +503,8 @@ func TestServeHTTPForListReqCache(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM) + restRESTMapperMgr := hubmeta.NewRESTMapperManager(dStorage) + cacheM, _ := cachemanager.NewCacheManager(sWrapper, serializerM, restRESTMapperMgr) fn := func() bool { return false @@ -510,15 +513,16 @@ func TestServeHTTPForListReqCache(t *testing.T) { lp := NewLocalProxy(cacheM, fn) testcases := map[string]struct { - userAgent string - keyPrefix string - inputObj []runtime.Object - accept string - verb string - path string - resource string - code int - expectD struct { + userAgent string + keyPrefix string + preCachedObj []runtime.Object + accept string + verb string + path string + resource string + gvr schema.GroupVersionResource + code int + expectD struct { rv string data map[string]struct{} } @@ -526,7 +530,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { "list pods request": { userAgent: "kubelet", keyPrefix: "kubelet/pods/default", - inputObj: []runtime.Object{ + preCachedObj: []runtime.Object{ &v1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -578,6 +582,23 @@ func TestServeHTTPForListReqCache(t *testing.T) { }, }, }, + "list unregistered resource(Foo) request": { + userAgent: "kubelet", + keyPrefix: "kubelet/foos", + preCachedObj: []runtime.Object{}, + accept: "application/json", + verb: "GET", + path: "/api/samplecontroller.k8s.io/v1/foos", + resource: "foos", + gvr: schema.GroupVersionResource{Group: "samplecontroller.k8s.io", Version: "v1", Resource: "foo"}, + code: http.StatusNotFound, + expectD: struct { + rv string + data map[string]struct{} + }{ + data: map[string]struct{}{}, + }, + }, } resolver := newTestRequestInfoResolver() @@ -585,10 +606,10 @@ func TestServeHTTPForListReqCache(t *testing.T) { t.Run(k, func(t *testing.T) { s := serializerM.CreateSerializer(tt.accept, "", "v1", tt.resource) accessor := meta.NewAccessor() - for i := range tt.inputObj { - name, _ := accessor.Name(tt.inputObj[i]) + for i := range tt.preCachedObj { + name, _ := accessor.Name(tt.preCachedObj[i]) key := filepath.Join(tt.keyPrefix, name) - _ = sWrapper.Update(key, tt.inputObj[i]) + _ = sWrapper.Update(key, tt.preCachedObj[i]) } req, _ := http.NewRequest(tt.verb, tt.path, nil) @@ -612,6 +633,13 @@ func TestServeHTTPForListReqCache(t *testing.T) { resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) result := resp.Result() + // For unregistered resources, server should return 404 not found + if result.StatusCode == http.StatusNotFound { + if _, gvk := restRESTMapperMgr.KindFor(tt.gvr); !gvk.Empty() { + t.Errorf("this resources %v is registered, but it should return 404 for unregistered resource", tt.gvr) + } + return + } if result.StatusCode != tt.code { t.Errorf("got status code %d, but expect %d", result.StatusCode, tt.code) } diff --git a/pkg/yurthub/proxy/remote/remote.go b/pkg/yurthub/proxy/remote/remote.go index 9f4b66f0b74..9b104afcdb8 100644 --- a/pkg/yurthub/proxy/remote/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -30,6 +30,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/transport" "github.com/openyurtio/openyurt/pkg/yurthub/util" + "k8s.io/apimachinery/pkg/runtime/schema" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/klog" ) @@ -142,6 +143,20 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error { resp.Body = rc } + } else if resp.StatusCode == http.StatusNotFound && info.Verb == "list" { + // 404 Not Found: The CRD may have been unregistered and should be updated locally as well. + // Other types of requests may return a 404 response for other reasons (for example, getting a pod that doesn't exist). + // And the main purpose is to return 404 when list an unregistered resource locally, so here only consider the list request. + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + + err := rp.cacheMgr.DeleteKindFor(gvr) + if err != nil { + klog.Errorf("failed: %v", err) + } } return nil } diff --git a/pkg/yurthub/storage/disk/storage.go b/pkg/yurthub/storage/disk/storage.go index 4128938c25d..e833d42a82e 100644 --- a/pkg/yurthub/storage/disk/storage.go +++ b/pkg/yurthub/storage/disk/storage.go @@ -345,11 +345,10 @@ func (ds *diskStorage) Update(key string, contents []byte) error { } // Replace will delete all files under rootKey dir and create new files with contents. +// Note: when the contents are empty and the dir already exists, the create function will clean the current dir func (ds *diskStorage) Replace(rootKey string, contents map[string][]byte) error { if rootKey == "" { return storage.ErrKeyIsEmpty - } else if len(contents) == 0 { - return storage.ErrKeyHasNoContent } for key := range contents { @@ -380,11 +379,13 @@ func (ds *diskStorage) Replace(rootKey string, contents map[string][]byte) error // 2. create new file with contents // TODO: if error happens, we may need retry mechanism, or add some mechanism to do consistency check. - for key, data := range contents { - err := ds.create(key, data) - if err != nil { - klog.Errorf("failed to create %s in replace, %v", key, err) - continue + if len(contents) != 0 { + for key, data := range contents { + err := ds.create(key, data) + if err != nil { + klog.Errorf("failed to create %s in replace, %v", key, err) + continue + } } } diff --git a/pkg/yurthub/storage/disk/storage_test.go b/pkg/yurthub/storage/disk/storage_test.go index fe0ab229f2d..6fce4169e12 100644 --- a/pkg/yurthub/storage/disk/storage_test.go +++ b/pkg/yurthub/storage/disk/storage_test.go @@ -648,6 +648,95 @@ func TestUpdate(t *testing.T) { } } +func TestReplace(t *testing.T) { + testcases := map[string]struct { + listKey string + preCreatedKeys map[string]string + replaceKeys map[string]string + result map[string]string + replaceErr error + }{ + "replace old files with contents": { + listKey: "kubelet/pods/default", + preCreatedKeys: map[string]string{ + "kubelet/pods/default/foo1": "test-pod1", + "kubelet/pods/default/foo2": "test-pod2", + }, + replaceKeys: map[string]string{ + "kubelet/pods/default/foo3": "test-pod3", + }, + result: map[string]string{ + "kubelet/pods/default/foo3": "test-pod3", + }, + }, + "replace old files with empty contents": { + listKey: "kubelet/pods/default", + preCreatedKeys: map[string]string{ + "kubelet/pods/default/foo1": "test-pod11", + "kubelet/pods/default/foo2": "test-pod21", + }, + replaceKeys: map[string]string{}, + result: map[string]string{}, + }, + } + s, err := NewDiskStorage(testDir) + if err != nil { + t.Fatalf("unable to new disk storage, %v", err) + } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + for key, data := range tc.preCreatedKeys { + err = s.Create(key, []byte(data)) + if err != nil { + t.Errorf("%s: Got error %v, wanted successful create %s", k, err, key) + } + } + + contents := make(map[string][]byte, len(tc.replaceKeys)) + for key, data := range tc.replaceKeys { + contents[key] = []byte(data) + } + + err = s.Replace(tc.listKey, contents) + if err != nil { + if tc.replaceErr != err { + t.Errorf("%s: expect error %v, but got %v", k, tc.replaceErr, err) + } + } + + if keys, err := s.ListKeys(tc.listKey); err == nil { + if len(keys) != len(tc.result) { + t.Errorf("expect the number of keys: %v, but got %v", len(tc.result), len(keys)) + } + } else { + t.Errorf("unable to list keys in the path of %s: %v", tc.listKey, err) + } + + for key, data := range tc.result { + b, err := s.Get(key) + if err != nil { + t.Errorf("%s: Got error %v, unable get key %s", k, err, key) + } + + if data != string(b) { + t.Errorf("%s: expect updated data %s, but got %s", k, data, string(b)) + } + } + + for key := range tc.result { + if err = s.Delete(key); err != nil { + t.Errorf("%s failed to delete key %s, %v", k, key, err) + } + } + }) + } + + if err = os.RemoveAll(testDir); err != nil { + t.Errorf("Got error %v, unable remove path %s", err, testDir) + } +} + func TestLockKey(t *testing.T) { testcases := map[string]struct { preLockedKeys []string