Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: upload autonomy status correctly #2096

Merged
merged 6 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 7 additions & 27 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
}

type CacheResult struct {
ErrorKeysLength int
ErrMsg string
Length int
Msg string
}

type cacheManager struct {
Expand All @@ -80,7 +80,6 @@
cacheAgents *CacheAgent
listSelectorCollector map[storage.Key]string
inMemoryCache map[string]runtime.Object
errorKeys *errorKeys
}

// NewCacheManager creates a new CacheManager
Expand All @@ -98,16 +97,15 @@
restMapperManager: restMapperMgr,
listSelectorCollector: make(map[storage.Key]string),
inMemoryCache: make(map[string]runtime.Object),
errorKeys: NewErrorKeys(),
}
cm.errorKeys.recover()
return cm
}

func (cm *cacheManager) QueryCacheResult() CacheResult {
length, msg := cm.storage.GetCacheResult()

Check warning on line 105 in pkg/yurthub/cachemanager/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/cache_manager.go#L105

Added line #L105 was not covered by tests
return CacheResult{
ErrorKeysLength: cm.errorKeys.length(),
ErrMsg: cm.errorKeys.aggregate(),
Length: length,
Msg: msg,

Check warning on line 108 in pkg/yurthub/cachemanager/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/cache_manager.go#L107-L108

Added lines #L107 - L108 were not covered by tests
}
}

Expand Down Expand Up @@ -427,7 +425,6 @@
}

if err != nil {
cm.errorKeys.put(key.Key(), err.Error())
klog.Errorf("could not process watch object %s, %v", key.Key(), err)
}
case watch.Bookmark:
Expand Down Expand Up @@ -496,13 +493,7 @@
Group: info.APIGroup,
Version: info.APIVersion,
})
err = cm.storeObjectWithKey(key, items[0])
if err != nil {
cm.errorKeys.put(key.Key(), err.Error())
return err
}
cm.errorKeys.del(key.Key())
return nil
return cm.storeObjectWithKey(key, items[0])

Check warning on line 496 in pkg/yurthub/cachemanager/cache_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/cache_manager.go#L496

Added line #L496 was not covered by tests
} else {
// list all objects or with fieldselector/labelselector
objs := make(map[storage.Key]runtime.Object)
Expand All @@ -526,20 +517,11 @@
objs[key] = items[i]
}
// if no objects in cloud cluster(objs is empty), it will clean the old files in the path of rootkey
err = cm.storage.ReplaceComponentList(comp, schema.GroupVersionResource{
return cm.storage.ReplaceComponentList(comp, schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
}, info.Namespace, objs)
if err != nil {
for key := range objs {
cm.errorKeys.put(key.Key(), err.Error())
}
}
for key := range objs {
cm.errorKeys.del(key.Key())
}
return nil
}
}

Expand Down Expand Up @@ -599,11 +581,9 @@

err = cm.storeObjectWithKey(key, obj)
if err != nil {
cm.errorKeys.put(key.Key(), err.Error())
klog.Errorf("could not store object %s, %v", key.Key(), err)
return err
}
cm.errorKeys.del(key.Key())
return cm.updateInMemoryCache(ctx, info, obj)
}

Expand Down
77 changes: 66 additions & 11 deletions pkg/yurthub/cachemanager/storage_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import (
"bytes"
"fmt"
"strings"
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -44,23 +45,28 @@
ListResourceKeysOfComponent(component string, gvr schema.GroupVersionResource) ([]storage.Key, error)
ReplaceComponentList(component string, gvr schema.GroupVersionResource, namespace string, contents map[storage.Key]runtime.Object) error
DeleteComponentResources(component string) error
SaveClusterInfo(key storage.ClusterInfoKey, content []byte) error
GetClusterInfo(key storage.ClusterInfoKey) ([]byte, error)
SaveClusterInfo(key storage.Key, content []byte) error
GetClusterInfo(key storage.Key) ([]byte, error)
GetStorage() storage.Store
GetCacheResult() (int, string)
}

type storageWrapper struct {
sync.RWMutex
store storage.Store
errorKeys *errorKeys
backendSerializer runtime.Serializer
}

// NewStorageWrapper create a StorageWrapper object
func NewStorageWrapper(storage storage.Store) StorageWrapper {
return &storageWrapper{
sw := &storageWrapper{
store: storage,
errorKeys: NewErrorKeys(),
backendSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme.Scheme, scheme.Scheme, json.SerializerOptions{}),
}
sw.errorKeys.recover()
return sw
}

func (sw *storageWrapper) Name() string {
Expand All @@ -75,6 +81,10 @@
return sw.store
}

func (sw *storageWrapper) GetCacheResult() (int, string) {
return sw.errorKeys.length(), sw.errorKeys.aggregate()

Check warning on line 85 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}

// Create store runtime object into backend storage
// if obj is nil, the storage used to represent the key
// will be created. for example: for disk storage,
Expand All @@ -83,21 +93,30 @@
var buf bytes.Buffer
if obj != nil {
if err := sw.backendSerializer.Encode(obj, &buf); err != nil {
sw.errorKeys.put(key.Key(), err.Error())

Check warning on line 96 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L96

Added line #L96 was not covered by tests
klog.Errorf("could not encode object in create for %s, %v", key.Key(), err)
return err
}
}

if err := sw.store.Create(key, buf.Bytes()); err != nil {
sw.errorKeys.put(key.Key(), err.Error())
return err
}

sw.errorKeys.del(key.Key())
return nil
}

// Delete remove runtime object that by specified key from backend storage
func (sw *storageWrapper) Delete(key storage.Key) error {
return sw.store.Delete(key)
err := sw.store.Delete(key)
if err != nil {
sw.errorKeys.put(key.Key(), fmt.Sprintf("failed to delete, %v", err.Error()))
return err

Check warning on line 116 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L115-L116

Added lines #L115 - L116 were not covered by tests
}
sw.errorKeys.del(key.Key())
return nil
}

// Get get the runtime object that specified by key from backend storage
Expand All @@ -108,11 +127,13 @@
} else if len(b) == 0 {
return nil, nil
}

//get the gvk from json data
gvk, err := json.DefaultMetaFactory.Interpret(b)
if err != nil {
return nil, err
}

var UnstructuredObj runtime.Object
if scheme.Scheme.Recognizes(*gvk) {
UnstructuredObj = nil
Expand Down Expand Up @@ -175,21 +196,27 @@
func (sw *storageWrapper) Update(key storage.Key, obj runtime.Object, rv uint64) (runtime.Object, error) {
var buf bytes.Buffer
if err := sw.backendSerializer.Encode(obj, &buf); err != nil {
sw.errorKeys.put(key.Key(), err.Error())

Check warning on line 199 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L199

Added line #L199 was not covered by tests
klog.Errorf("could not encode object in update for %s, %v", key.Key(), err)
return nil, err
}

if buf, err := sw.store.Update(key, buf.Bytes(), rv); err != nil {
if err == storage.ErrUpdateConflict {
if err == storage.ErrStorageNotFound {
return nil, err
} else if err == storage.ErrUpdateConflict {
obj, _, dErr := sw.backendSerializer.Decode(buf, nil, nil)
if dErr != nil {
sw.errorKeys.put(key.Key(), err.Error())
return nil, fmt.Errorf("could not decode existing obj of key %s, %v", key.Key(), dErr)
}
sw.errorKeys.put(key.Key(), err.Error())
return obj, err
}
sw.errorKeys.put(key.Key(), err.Error())

Check warning on line 216 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L216

Added line #L216 was not covered by tests
return nil, err
}

sw.errorKeys.del(key.Key())
return obj, nil
}

Expand All @@ -198,6 +225,9 @@
contents := make(map[storage.Key][]byte, len(objs))
for key, obj := range objs {
if err := sw.backendSerializer.Encode(obj, &buf); err != nil {
for k := range objs {
sw.errorKeys.put(k.Key(), err.Error())

Check warning on line 229 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L228-L229

Added lines #L228 - L229 were not covered by tests
}
klog.Errorf("could not encode object in update for %s, %v", key.Key(), err)
return err
}
Expand All @@ -206,18 +236,43 @@
buf.Reset()
}

return sw.store.ReplaceComponentList(component, gvr, namespace, contents)
err := sw.store.ReplaceComponentList(component, gvr, namespace, contents)
if err != nil {
for key := range objs {
sw.errorKeys.put(key.Key(), err.Error())

Check warning on line 242 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L241-L242

Added lines #L241 - L242 were not covered by tests
}
return err

Check warning on line 244 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L244

Added line #L244 was not covered by tests
}
for key := range objs {
sw.errorKeys.del(key.Key())
}
return nil
}

// DeleteCollection will delete all objects under rootKey
func (sw *storageWrapper) DeleteComponentResources(component string) error {
return sw.store.DeleteComponentResources(component)
err := sw.store.DeleteComponentResources(component)
if err != nil {
return err

Check warning on line 256 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L256

Added line #L256 was not covered by tests
}
for key := range sw.errorKeys.keys {
if strings.HasPrefix(key, component+"/") {
sw.errorKeys.del(key)
}
}
return nil
}

func (sw *storageWrapper) SaveClusterInfo(key storage.ClusterInfoKey, content []byte) error {
return sw.store.SaveClusterInfo(key, content)
func (sw *storageWrapper) SaveClusterInfo(key storage.Key, content []byte) error {
err := sw.store.SaveClusterInfo(key, content)
if err != nil {
sw.errorKeys.put(key.Key(), fmt.Sprintf("failed to store cluster info, %v", err.Error()))
return err

Check warning on line 270 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L266-L270

Added lines #L266 - L270 were not covered by tests
}
sw.errorKeys.del(key.Key())
return nil

Check warning on line 273 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L272-L273

Added lines #L272 - L273 were not covered by tests
}

func (sw *storageWrapper) GetClusterInfo(key storage.ClusterInfoKey) ([]byte, error) {
func (sw *storageWrapper) GetClusterInfo(key storage.Key) ([]byte, error) {

Check warning on line 276 in pkg/yurthub/cachemanager/storage_wrapper.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/storage_wrapper.go#L276

Added line #L276 was not covered by tests
return sw.store.GetClusterInfo(key)
}
6 changes: 3 additions & 3 deletions pkg/yurthub/proxy/autonomy/autonomy.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,17 @@

func (ap *AutonomyProxy) updateNodeConditions(originalNode *v1.Node) (*v1.Node, bool) {
node := originalNode.DeepCopy()
if node.Annotations[projectinfo.GetAutonomyAnnotation()] == "false" || node.Labels[projectinfo.GetEdgeWorkerLabelKey()] == "false" {
if node.Annotations[projectinfo.GetAutonomyAnnotation()] != "true" || node.Labels[projectinfo.GetEdgeWorkerLabelKey()] == "false" {
setNodeAutonomyCondition(node, v1.ConditionFalse, "autonomy disabled", "The autonomy is disabled or this node is not edge node")
} else {
res := ap.cacheMgr.QueryCacheResult()
if res.ErrorKeysLength == 0 {
if res.Length == 0 {
setNodeAutonomyCondition(node, v1.ConditionTrue, "autonomy enabled successfully", "The autonomy is enabled and it works fine")
atomic.StoreInt32(ap.cacheFailedCount, 0)
} else {
currentFailures := atomic.AddInt32(ap.cacheFailedCount, 1)
if int(currentFailures) > maxCacheFailures {
setNodeAutonomyCondition(node, v1.ConditionUnknown, "cache failed", res.ErrMsg)
setNodeAutonomyCondition(node, v1.ConditionUnknown, "cache failed", res.Msg)

Check warning on line 181 in pkg/yurthub/proxy/autonomy/autonomy.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/proxy/autonomy/autonomy.go#L181

Added line #L181 was not covered by tests
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/server/nonresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func wrapNonResourceHandler(proxyHandler http.Handler, config *config.YurtHubCon

func localCacheHandler(handler NonResourceHandler, restMgr *rest.RestConfigManager, sw cachemanager.StorageWrapper, path string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := storage.ClusterInfoKey{
key := &storage.ClusterInfoKey{
ClusterInfoType: nonResourceReqPaths[path],
UrlPath: path,
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func localCacheHandler(handler NonResourceHandler, restMgr *rest.RestConfigManag

func nonResourceHandler(kubeClient *kubernetes.Clientset, sw cachemanager.StorageWrapper, path string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := storage.ClusterInfoKey{
key := &storage.ClusterInfoKey{
ClusterInfoType: nonResourceReqPaths[path],
UrlPath: path,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/server/nonresource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestLocalCacheHandler(t *testing.T) {

for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
key := storage.ClusterInfoKey{
key := &storage.ClusterInfoKey{
ClusterInfoType: nonResourceReqPaths[tt.path],
UrlPath: tt.path,
}
Expand Down
32 changes: 9 additions & 23 deletions pkg/yurthub/storage/disk/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,50 +421,36 @@
return nil
}

func (ds *diskStorage) SaveClusterInfo(key storage.ClusterInfoKey, content []byte) error {
var path string
switch key.ClusterInfoType {
case storage.APIsInfo, storage.Version:
path = filepath.Join(ds.baseDir, string(key.ClusterInfoType))
case storage.APIResourcesInfo:
translatedURLPath := strings.ReplaceAll(key.UrlPath, "/", "_")
path = filepath.Join(ds.baseDir, translatedURLPath)
default:
func (ds *diskStorage) SaveClusterInfo(key storage.Key, content []byte) error {
if key.Key() == "" {
return storage.ErrUnknownClusterInfoType
}

path := filepath.Join(ds.baseDir, key.Key())
if err := ds.fsOperator.CreateFile(path, content); err != nil {
if err == fs.ErrExists {
// file exists, overwrite it with content
if werr := ds.fsOperator.Write(path, content); werr != nil {
return fmt.Errorf("could not update clusterInfo %s at path %s, %v", key.ClusterInfoType, path, werr)
return fmt.Errorf("could not update clusterInfo at path %s, %v", path, werr)

Check warning on line 433 in pkg/yurthub/storage/disk/storage.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/storage/disk/storage.go#L433

Added line #L433 was not covered by tests
}
return nil
}
return fmt.Errorf("could not create %s clusterInfo file at path %s, %v", key.ClusterInfoType, path, err)
return fmt.Errorf("could not create clusterInfo file at path %s, %v", path, err)

Check warning on line 437 in pkg/yurthub/storage/disk/storage.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/storage/disk/storage.go#L437

Added line #L437 was not covered by tests
}
return nil
}

func (ds *diskStorage) GetClusterInfo(key storage.ClusterInfoKey) ([]byte, error) {
var path string
switch key.ClusterInfoType {
case storage.APIsInfo, storage.Version:
path = filepath.Join(ds.baseDir, string(key.ClusterInfoType))
case storage.APIResourcesInfo:
translatedURLPath := strings.ReplaceAll(key.UrlPath, "/", "_")
path = filepath.Join(ds.baseDir, translatedURLPath)
default:
func (ds *diskStorage) GetClusterInfo(key storage.Key) ([]byte, error) {
if key.Key() == "" {
return nil, storage.ErrUnknownClusterInfoType
}

path := filepath.Join(ds.baseDir, key.Key())
var buf []byte
var err error
if buf, err = ds.fsOperator.Read(path); err != nil {
if err == fs.ErrNotExists {
return nil, storage.ErrStorageNotFound
}
return nil, fmt.Errorf("could not read %s clusterInfo file at %s, %v", key.ClusterInfoType, path, err)
return nil, fmt.Errorf("could not read clusterInfo file at %s, %v", path, err)

Check warning on line 453 in pkg/yurthub/storage/disk/storage.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/storage/disk/storage.go#L453

Added line #L453 was not covered by tests
}
return buf, nil
}
Expand Down
Loading
Loading