Skip to content

Commit

Permalink
fix querying local empty list error (openyurtio#326)
Browse files Browse the repository at this point in the history
  • Loading branch information
qclc authored Aug 6, 2021
1 parent 070549b commit b73e51e
Show file tree
Hide file tree
Showing 13 changed files with 874 additions and 132 deletions.
4 changes: 4 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned"
Expand Down Expand Up @@ -68,6 +69,7 @@ type YurtHubConfiguration struct {
HubAgentDummyIfName string
StorageWrapper cachemanager.StorageWrapper
SerializerManager *serializer.SerializerManager
RESTMapperManager *meta.RESTMapperManager
TLSConfig *tls.Config
MutatedMasterServiceAddr string
Filters *filter.Filters
Expand All @@ -89,6 +91,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
}
storageWrapper := cachemanager.NewStorageWrapper(storageManager)
serializerManager := serializer.NewSerializerManager()
restMapperManager := meta.NewRESTMapperManager(storageManager)

hubServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubPort)
proxyServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubProxyPort)
Expand Down Expand Up @@ -141,6 +144,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
HubAgentDummyIfName: options.HubAgentDummyIfName,
StorageWrapper: storageWrapper,
SerializerManager: serializerManager,
RESTMapperManager: restMapperManager,
MutatedMasterServiceAddr: mutatedMasterServiceAddr,
Filters: filters,
SharedFactory: sharedFactory,
Expand Down
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager)
if err != nil {
klog.Errorf("could not new cache manager, %v", err)
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestInitCacheAgents(t *testing.T) {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)
m, _ := NewCacheManager(s, nil, nil)

// default cache agents in fake store
b, err := s.GetRaw(cacheAgentsKey)
Expand All @@ -52,7 +52,7 @@ func TestInitCacheAgents(t *testing.T) {
// add agents for next init cache
_ = m.UpdateCacheAgents([]string{"agent1"})

_, _ = NewCacheManager(s, nil)
_, _ = NewCacheManager(s, nil, nil)

b2, err := s.GetRaw(cacheAgentsKey)
if err != nil {
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestUpdateCacheAgents(t *testing.T) {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)
m, _ := NewCacheManager(s, nil, nil)

testcases := map[string]struct {
desc string
Expand Down
72 changes: 51 additions & 21 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"sync"

hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
Expand All @@ -52,12 +53,14 @@ type CacheManager interface {
UpdateCacheAgents(agents []string) error
ListCacheAgents() []string
CanCacheFor(req *http.Request) bool
DeleteKindFor(gvr schema.GroupVersionResource) error
}

type cacheManager struct {
sync.RWMutex
storage StorageWrapper
serializerManager *serializer.SerializerManager
restMapperManager *hubmeta.RESTMapperManager
cacheAgents map[string]bool
listSelectorCollector map[string]string
}
Expand All @@ -66,10 +69,12 @@ type cacheManager struct {
func NewCacheManager(
storage StorageWrapper,
serializerMgr *serializer.SerializerManager,
restMapperMgr *hubmeta.RESTMapperManager,
) (CacheManager, error) {
cm := &cacheManager{
storage: storage,
serializerManager: serializerMgr,
restMapperManager: restMapperMgr,
cacheAgents: make(map[string]bool),
listSelectorCollector: make(map[string]string),
}
Expand Down Expand Up @@ -148,21 +153,38 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro

var gvk schema.GroupVersionKind
var kind string
// If the GVR information is not recognized, return 404 not found directly
gvr := schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
}
if _, gvk = cm.restMapperManager.KindFor(gvr); gvk.Empty() {
return nil, hubmeta.ErrGVRNotRecognized
} else {
kind = gvk.Kind
}

// If the GVR information is recognized, return list or empty list
objs, err := cm.storage.List(key)
if err != nil {
return nil, err
} else if len(objs) == 0 {
gvk, err = serializer.UnsafeDefaultRESTMapper.KindFor(schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
})
if err != nil {
if err != storage.ErrStorageNotFound {
return nil, err
} else if isPodKey(key) {
// because at least there will be yurt-hub pod on the node.
// if no pods in cache, maybe all of pods have been deleted by accident,
// if empty object is returned, pods on node will be deleted by kubelet.
// in order to prevent the influence to business, return error here so pods
// will be kept on node.
return nil, err
}
kind = gvk.Kind
} else {
kind = objs[0].GetObjectKind().GroupVersionKind().Kind
} else if len(objs) != 0 {
// If restMapper's kind and object's kind are inconsistent, use the object's kind
objKind := objs[0].GetObjectKind().GroupVersionKind().Kind
if kind != objKind {
klog.Warningf("The restMapper's kind(%v) and object's kind(%v) are inconsistent ", kind, objKind)
kind = objKind
}
}

var listObj runtime.Object
Expand Down Expand Up @@ -348,16 +370,13 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
}.String()
accessor := meta.NewAccessor()

// Verify if DynamicRESTMapper(which store the CRD info) needs to be updated
if err := cm.restMapperManager.UpdateKind(schema.GroupVersionKind{Group: info.APIGroup, Version: info.APIVersion, Kind: kind}); err != nil {
klog.Errorf("failed to update the DynamicRESTMapper %v", err)
}

comp, _ := util.ClientComponentFrom(ctx)
// even if no objects in cloud cluster, we need to
// make up a storage that represents the no resources
// in local disk, so when cloud-edge network disconnected,
// yurthub can return empty objects instead of 404 code(not found)
if len(items) == 0 {
// list returns no objects
key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, "")
return cm.storage.Create(key, nil)
} else if info.Name != "" && len(items) == 1 {
if info.Name != "" && len(items) == 1 {
// list with fieldSelector=metadata.name=xxx
accessor.SetKind(items[0], kind)
accessor.SetAPIVersion(items[0], apiVersion)
Expand Down Expand Up @@ -390,7 +409,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
key, _ := util.KeyFunc(comp, info.Resource, ns, name)
objs[key] = items[i]
}

// if no objects in cloud cluster(objs is empty), it will clean the old files in the path of rootkey
return cm.storage.Replace(rootKey, objs)
}
}
Expand Down Expand Up @@ -436,6 +455,12 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
return err
}

// Verify if DynamicRESTMapper(which store the CRD info) needs to be updated
gvk := obj.GetObjectKind().GroupVersionKind()
if err := cm.restMapperManager.UpdateKind(gvk); err != nil {
klog.Errorf("failed to update the DynamicRESTMapper %v", err)
}

if err := cm.saveOneObjectWithValidation(key, obj); err != nil {
if err != storage.ErrStorageAccessConflict {
return err
Expand Down Expand Up @@ -600,3 +625,8 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool {
}
return true
}

// DeleteKindFor is used to delete the invalid Kind(which is not registered in the cloud)
func (cm *cacheManager) DeleteKindFor(gvr schema.GroupVersionResource) error {
return cm.restMapperManager.DeleteKindFor(gvr)
}
Loading

0 comments on commit b73e51e

Please sign in to comment.