Skip to content

Commit

Permalink
fix: upload autonomy status correctly (#2096)
Browse files Browse the repository at this point in the history
* fix: upload autonomy status correctly
  • Loading branch information
vie-serendipity committed Jul 16, 2024
1 parent 4ea067d commit 38171bc
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 85 deletions.
34 changes: 7 additions & 27 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ type CacheManager interface {
}

type CacheResult struct {
ErrorKeysLength int
ErrMsg string
Length int
Msg string
}

type cacheManager struct {
Expand All @@ -80,7 +80,6 @@ type cacheManager struct {
cacheAgents *CacheAgent
listSelectorCollector map[storage.Key]string
inMemoryCache map[string]runtime.Object
errorKeys *errorKeys
}

// NewCacheManager creates a new CacheManager
Expand All @@ -98,16 +97,15 @@ func NewCacheManager(
restMapperManager: restMapperMgr,
listSelectorCollector: make(map[storage.Key]string),
inMemoryCache: make(map[string]runtime.Object),
errorKeys: NewErrorKeys(),
}
cm.errorKeys.recover()
return cm
}

func (cm *cacheManager) QueryCacheResult() CacheResult {
length, msg := cm.storage.GetCacheResult()
return CacheResult{
ErrorKeysLength: cm.errorKeys.length(),
ErrMsg: cm.errorKeys.aggregate(),
Length: length,
Msg: msg,
}
}

Expand Down Expand Up @@ -427,7 +425,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re
}

if err != nil {
cm.errorKeys.put(key.Key(), err.Error())
klog.Errorf("could not process watch object %s, %v", key.Key(), err)
}
case watch.Bookmark:
Expand Down Expand Up @@ -496,13 +493,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
Group: info.APIGroup,
Version: info.APIVersion,
})
err = cm.storeObjectWithKey(key, items[0])
if err != nil {
cm.errorKeys.put(key.Key(), err.Error())
return err
}
cm.errorKeys.del(key.Key())
return nil
return cm.storeObjectWithKey(key, items[0])
} else {
// list all objects or with fieldselector/labelselector
objs := make(map[storage.Key]runtime.Object)
Expand All @@ -526,20 +517,11 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
objs[key] = items[i]
}
// if no objects in cloud cluster(objs is empty), it will clean the old files in the path of rootkey
err = cm.storage.ReplaceComponentList(comp, schema.GroupVersionResource{
return cm.storage.ReplaceComponentList(comp, schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
}, info.Namespace, objs)
if err != nil {
for key := range objs {
cm.errorKeys.put(key.Key(), err.Error())
}
}
for key := range objs {
cm.errorKeys.del(key.Key())
}
return nil
}
}

Expand Down Expand Up @@ -599,11 +581,9 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ

err = cm.storeObjectWithKey(key, obj)
if err != nil {
cm.errorKeys.put(key.Key(), err.Error())
klog.Errorf("could not store object %s, %v", key.Key(), err)
return err
}
cm.errorKeys.del(key.Key())
return cm.updateInMemoryCache(ctx, info, obj)
}

Expand Down
77 changes: 66 additions & 11 deletions pkg/yurthub/cachemanager/storage_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cachemanager
import (
"bytes"
"fmt"
"strings"
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -44,23 +45,28 @@ type StorageWrapper interface {
ListResourceKeysOfComponent(component string, gvr schema.GroupVersionResource) ([]storage.Key, error)
ReplaceComponentList(component string, gvr schema.GroupVersionResource, namespace string, contents map[storage.Key]runtime.Object) error
DeleteComponentResources(component string) error
SaveClusterInfo(key storage.ClusterInfoKey, content []byte) error
GetClusterInfo(key storage.ClusterInfoKey) ([]byte, error)
SaveClusterInfo(key storage.Key, content []byte) error
GetClusterInfo(key storage.Key) ([]byte, error)
GetStorage() storage.Store
GetCacheResult() (int, string)
}

type storageWrapper struct {
sync.RWMutex
store storage.Store
errorKeys *errorKeys
backendSerializer runtime.Serializer
}

// NewStorageWrapper create a StorageWrapper object
func NewStorageWrapper(storage storage.Store) StorageWrapper {
return &storageWrapper{
sw := &storageWrapper{
store: storage,
errorKeys: NewErrorKeys(),
backendSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, json.SerializerOptions{}),
}
sw.errorKeys.recover()
return sw
}

func (sw *storageWrapper) Name() string {
Expand All @@ -75,6 +81,10 @@ func (sw *storageWrapper) GetStorage() storage.Store {
return sw.store
}

func (sw *storageWrapper) GetCacheResult() (int, string) {
return sw.errorKeys.length(), sw.errorKeys.aggregate()
}

// Create store runtime object into backend storage
// if obj is nil, the storage used to represent the key
// will be created. for example: for disk storage,
Expand All @@ -83,21 +93,30 @@ func (sw *storageWrapper) Create(key storage.Key, obj runtime.Object) error {
var buf bytes.Buffer
if obj != nil {
if err := sw.backendSerializer.Encode(obj, &buf); err != nil {
sw.errorKeys.put(key.Key(), err.Error())
klog.Errorf("could not encode object in create for %s, %v", key.Key(), err)
return err
}
}

if err := sw.store.Create(key, buf.Bytes()); err != nil {
sw.errorKeys.put(key.Key(), err.Error())
return err
}

sw.errorKeys.del(key.Key())
return nil
}

// Delete remove runtime object that by specified key from backend storage
func (sw *storageWrapper) Delete(key storage.Key) error {
return sw.store.Delete(key)
err := sw.store.Delete(key)
if err != nil {
sw.errorKeys.put(key.Key(), fmt.Sprintf("failed to delete, %v", err.Error()))
return err
}
sw.errorKeys.del(key.Key())
return nil
}

// Get get the runtime object that specified by key from backend storage
Expand All @@ -108,11 +127,13 @@ func (sw *storageWrapper) Get(key storage.Key) (runtime.Object, error) {
} else if len(b) == 0 {
return nil, nil
}

//get the gvk from json data
gvk, err := json.DefaultMetaFactory.Interpret(b)
if err != nil {
return nil, err
}

var UnstructuredObj runtime.Object
if scheme.Scheme.Recognizes(*gvk) {
UnstructuredObj = nil
Expand Down Expand Up @@ -175,21 +196,27 @@ func (sw *storageWrapper) List(key storage.Key) ([]runtime.Object, error) {
func (sw *storageWrapper) Update(key storage.Key, obj runtime.Object, rv uint64) (runtime.Object, error) {
var buf bytes.Buffer
if err := sw.backendSerializer.Encode(obj, &buf); err != nil {
sw.errorKeys.put(key.Key(), err.Error())
klog.Errorf("could not encode object in update for %s, %v", key.Key(), err)
return nil, err
}

if buf, err := sw.store.Update(key, buf.Bytes(), rv); err != nil {
if err == storage.ErrUpdateConflict {
if err == storage.ErrStorageNotFound {
return nil, err
} else if err == storage.ErrUpdateConflict {
obj, _, dErr := sw.backendSerializer.Decode(buf, nil, nil)
if dErr != nil {
sw.errorKeys.put(key.Key(), err.Error())
return nil, fmt.Errorf("could not decode existing obj of key %s, %v", key.Key(), dErr)
}
sw.errorKeys.put(key.Key(), err.Error())
return obj, err
}
sw.errorKeys.put(key.Key(), err.Error())
return nil, err
}

sw.errorKeys.del(key.Key())
return obj, nil
}

Expand All @@ -198,6 +225,9 @@ func (sw *storageWrapper) ReplaceComponentList(component string, gvr schema.Grou
contents := make(map[storage.Key][]byte, len(objs))
for key, obj := range objs {
if err := sw.backendSerializer.Encode(obj, &buf); err != nil {
for k := range objs {
sw.errorKeys.put(k.Key(), err.Error())
}
klog.Errorf("could not encode object in update for %s, %v", key.Key(), err)
return err
}
Expand All @@ -206,18 +236,43 @@ func (sw *storageWrapper) ReplaceComponentList(component string, gvr schema.Grou
buf.Reset()
}

return sw.store.ReplaceComponentList(component, gvr, namespace, contents)
err := sw.store.ReplaceComponentList(component, gvr, namespace, contents)
if err != nil {
for key := range objs {
sw.errorKeys.put(key.Key(), err.Error())
}
return err
}
for key := range objs {
sw.errorKeys.del(key.Key())
}
return nil
}

// DeleteCollection will delete all objects under rootKey
func (sw *storageWrapper) DeleteComponentResources(component string) error {
return sw.store.DeleteComponentResources(component)
err := sw.store.DeleteComponentResources(component)
if err != nil {
return err
}
for key := range sw.errorKeys.keys {
if strings.HasPrefix(key, component+"/") {
sw.errorKeys.del(key)
}
}
return nil
}

func (sw *storageWrapper) SaveClusterInfo(key storage.ClusterInfoKey, content []byte) error {
return sw.store.SaveClusterInfo(key, content)
func (sw *storageWrapper) SaveClusterInfo(key storage.Key, content []byte) error {
err := sw.store.SaveClusterInfo(key, content)
if err != nil {
sw.errorKeys.put(key.Key(), fmt.Sprintf("failed to store cluster info, %v", err.Error()))
return err
}
sw.errorKeys.del(key.Key())
return nil
}

func (sw *storageWrapper) GetClusterInfo(key storage.ClusterInfoKey) ([]byte, error) {
func (sw *storageWrapper) GetClusterInfo(key storage.Key) ([]byte, error) {
return sw.store.GetClusterInfo(key)
}
6 changes: 3 additions & 3 deletions pkg/yurthub/proxy/autonomy/autonomy.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,17 @@ func (ap *AutonomyProxy) tryUpdateNodeConditions(tryNumber int, req *http.Reques

func (ap *AutonomyProxy) updateNodeConditions(originalNode *v1.Node) (*v1.Node, bool) {
node := originalNode.DeepCopy()
if node.Annotations[projectinfo.GetAutonomyAnnotation()] == "false" || node.Labels[projectinfo.GetEdgeWorkerLabelKey()] == "false" {
if node.Annotations[projectinfo.GetAutonomyAnnotation()] != "true" || node.Labels[projectinfo.GetEdgeWorkerLabelKey()] == "false" {
setNodeAutonomyCondition(node, v1.ConditionFalse, "autonomy disabled", "The autonomy is disabled or this node is not edge node")
} else {
res := ap.cacheMgr.QueryCacheResult()
if res.ErrorKeysLength == 0 {
if res.Length == 0 {
setNodeAutonomyCondition(node, v1.ConditionTrue, "autonomy enabled successfully", "The autonomy is enabled and it works fine")
atomic.StoreInt32(ap.cacheFailedCount, 0)
} else {
currentFailures := atomic.AddInt32(ap.cacheFailedCount, 1)
if int(currentFailures) > maxCacheFailures {
setNodeAutonomyCondition(node, v1.ConditionUnknown, "cache failed", res.ErrMsg)
setNodeAutonomyCondition(node, v1.ConditionUnknown, "cache failed", res.Msg)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/server/nonresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func wrapNonResourceHandler(proxyHandler http.Handler, config *config.YurtHubCon

func localCacheHandler(handler NonResourceHandler, restMgr *rest.RestConfigManager, sw cachemanager.StorageWrapper, path string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := storage.ClusterInfoKey{
key := &storage.ClusterInfoKey{
ClusterInfoType: nonResourceReqPaths[path],
UrlPath: path,
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func localCacheHandler(handler NonResourceHandler, restMgr *rest.RestConfigManag

func nonResourceHandler(kubeClient *kubernetes.Clientset, sw cachemanager.StorageWrapper, path string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := storage.ClusterInfoKey{
key := &storage.ClusterInfoKey{
ClusterInfoType: nonResourceReqPaths[path],
UrlPath: path,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/server/nonresource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestLocalCacheHandler(t *testing.T) {

for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
key := storage.ClusterInfoKey{
key := &storage.ClusterInfoKey{
ClusterInfoType: nonResourceReqPaths[tt.path],
UrlPath: tt.path,
}
Expand Down
32 changes: 9 additions & 23 deletions pkg/yurthub/storage/disk/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,50 +421,36 @@ func (ds *diskStorage) DeleteComponentResources(component string) error {
return nil
}

func (ds *diskStorage) SaveClusterInfo(key storage.ClusterInfoKey, content []byte) error {
var path string
switch key.ClusterInfoType {
case storage.APIsInfo, storage.Version:
path = filepath.Join(ds.baseDir, string(key.ClusterInfoType))
case storage.APIResourcesInfo:
translatedURLPath := strings.ReplaceAll(key.UrlPath, "/", "_")
path = filepath.Join(ds.baseDir, translatedURLPath)
default:
func (ds *diskStorage) SaveClusterInfo(key storage.Key, content []byte) error {
if key.Key() == "" {
return storage.ErrUnknownClusterInfoType
}

path := filepath.Join(ds.baseDir, key.Key())
if err := ds.fsOperator.CreateFile(path, content); err != nil {
if err == fs.ErrExists {
// file exists, overwrite it with content
if werr := ds.fsOperator.Write(path, content); werr != nil {
return fmt.Errorf("could not update clusterInfo %s at path %s, %v", key.ClusterInfoType, path, werr)
return fmt.Errorf("could not update clusterInfo at path %s, %v", path, werr)
}
return nil
}
return fmt.Errorf("could not create %s clusterInfo file at path %s, %v", key.ClusterInfoType, path, err)
return fmt.Errorf("could not create clusterInfo file at path %s, %v", path, err)
}
return nil
}

func (ds *diskStorage) GetClusterInfo(key storage.ClusterInfoKey) ([]byte, error) {
var path string
switch key.ClusterInfoType {
case storage.APIsInfo, storage.Version:
path = filepath.Join(ds.baseDir, string(key.ClusterInfoType))
case storage.APIResourcesInfo:
translatedURLPath := strings.ReplaceAll(key.UrlPath, "/", "_")
path = filepath.Join(ds.baseDir, translatedURLPath)
default:
func (ds *diskStorage) GetClusterInfo(key storage.Key) ([]byte, error) {
if key.Key() == "" {
return nil, storage.ErrUnknownClusterInfoType
}

path := filepath.Join(ds.baseDir, key.Key())
var buf []byte
var err error
if buf, err = ds.fsOperator.Read(path); err != nil {
if err == fs.ErrNotExists {
return nil, storage.ErrStorageNotFound
}
return nil, fmt.Errorf("could not read %s clusterInfo file at %s, %v", key.ClusterInfoType, path, err)
return nil, fmt.Errorf("could not read clusterInfo file at %s, %v", path, err)
}
return buf, nil
}
Expand Down
Loading

0 comments on commit 38171bc

Please sign in to comment.