Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add autonomy manager #2033

Merged
merged 22 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,18 @@ jobs:
skip-cache: true
mode: readonly

markdownlint-misspell-shellcheck:
runs-on: ubuntu-22.04
# this image is build from Dockerfile
# https://github.com/pouchcontainer/pouchlinter/blob/master/Dockerfile
container: pouchcontainer/pouchlinter:v0.1.2
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Run misspell
run: find ./* -name "*" | xargs misspell -error
- name: Lint markdown files
run: find ./ -name "*.md" | grep -v enhancements | grep -v .github
# markdownlint-misspell-shellcheck:
# runs-on: ubuntu-22.04
# # this image is build from Dockerfile
# # https://github.com/pouchcontainer/pouchlinter/blob/master/Dockerfile
# container: pouchcontainer/pouchlinter:v0.1.2
# steps:
# - name: Checkout
# uses: actions/checkout@v3
# - name: Run misspell
# run: find ./* -name "*" | xargs misspell -error
# - name: Lint markdown files
# run: find ./ -name "*.md" | grep -v enhancements | grep -v .github
# - name: Check markdown links
# run: |
# set +e
Expand Down
1 change: 1 addition & 0 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(
cfg,
cacheMgr,
restConfigMgr,
transportManager,
cloudHealthChecker,
tenantMgr,
Expand Down
23 changes: 23 additions & 0 deletions pkg/apis/apps/v1beta1/conditions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2024 The OpenYurt Authors.

Licensed under the Apache License, Version 2.0 (the License);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1

import v1 "k8s.io/api/core/v1"

const (
NodeAutonomy v1.NodeConditionType = "Autonomy"
)
58 changes: 44 additions & 14 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ type CacheManager interface {
QueryCache(req *http.Request) (runtime.Object, error)
CanCacheFor(req *http.Request) bool
DeleteKindFor(gvr schema.GroupVersionResource) error
QueryCacheResult() CacheResult
}

type CacheResult struct {
ErrorKeysLength int
ErrMsg string
}

type cacheManager struct {
Expand All @@ -74,6 +80,7 @@ type cacheManager struct {
cacheAgents *CacheAgent
listSelectorCollector map[storage.Key]string
inMemoryCache map[string]runtime.Object
errorKeys *errorKeys
}

// NewCacheManager creates a new CacheManager
Expand All @@ -91,11 +98,19 @@ func NewCacheManager(
restMapperManager: restMapperMgr,
listSelectorCollector: make(map[storage.Key]string),
inMemoryCache: make(map[string]runtime.Object),
errorKeys: NewErrorKeys(),
rambohe-ch marked this conversation as resolved.
Show resolved Hide resolved
}

cm.errorKeys.recover()
return cm
}

func (cm *cacheManager) QueryCacheResult() CacheResult {
return CacheResult{
ErrorKeysLength: cm.errorKeys.length(),
ErrMsg: cm.errorKeys.aggregate(),
}
}

// CacheResponse cache response of request into backend storage
func (cm *cacheManager) CacheResponse(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) error {
ctx := req.Context()
Expand Down Expand Up @@ -190,7 +205,7 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
} else if len(objs) == 0 {
if isKubeletPodRequest(req) {
// because at least there will be yurt-hub pod on the node.
// if no pods in cache, maybe all of pods have been deleted by accident,
// if no pods in cache, maybe all pods have been deleted by accident,
// if empty object is returned, pods on node will be deleted by kubelet.
// in order to prevent the influence to business, return error here so pods
// will be kept on node.
Expand Down Expand Up @@ -266,7 +281,7 @@ func (cm *cacheManager) queryOneObject(req *http.Request) (runtime.Object, error
// we need to rebuild the in-memory cache with backend consistent storage.
// Note:
// When cloud-edge network is healthy, the inMemoryCache can be updated with response from cloud side.
// While cloud-edge network is broken, the inMemoryCache can only be full filled with data from edge cache,
// While cloud-edge network is broken, the inMemoryCache can only be fulfilled with data from edge cache,
// such as local disk and yurt-coordinator.
if isInMemoryCacheMiss {
return obj, cm.updateInMemoryCache(ctx, info, obj)
Expand Down Expand Up @@ -373,7 +388,6 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re
klog.Errorf("could not get namespace of watch object, %v", err)
continue
}

key, err := cm.storage.KeyFunc(storage.KeyBuildInfo{
Component: comp,
Namespace: ns,
Expand Down Expand Up @@ -405,14 +419,15 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re
// for now, If it's a delete request, no need to modify the inmemory cache,
// because currently, there shouldn't be any delete requests for nodes or leases.
default:
// impossible go to here
// impossible go here
}

if info.Resource == "pods" {
klog.V(2).Infof("pod(%s) is %s", key.Key(), string(watchType))
}

if err != nil {
cm.errorKeys.put(key.Key(), err.Error())
klog.Errorf("could not process watch object %s, %v", key.Key(), err)
}
case watch.Bookmark:
Expand Down Expand Up @@ -481,7 +496,13 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
Group: info.APIGroup,
Version: info.APIVersion,
})
return cm.storeObjectWithKey(key, items[0])
err = cm.storeObjectWithKey(key, items[0])
if err != nil {
cm.errorKeys.put(key.Key(), err.Error())
return err
}
cm.errorKeys.del(key.Key())
return nil
} else {
// list all objects or with fieldselector/labelselector
objs := make(map[storage.Key]runtime.Object)
Expand All @@ -494,7 +515,6 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
if ns == "" {
ns = info.Namespace
}

key, _ := cm.storage.KeyFunc(storage.KeyBuildInfo{
Component: comp,
Namespace: ns,
Expand All @@ -506,11 +526,20 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
objs[key] = items[i]
}
// if no objects in cloud cluster(objs is empty), it will clean the old files in the path of rootkey
return cm.storage.ReplaceComponentList(comp, schema.GroupVersionResource{
err = cm.storage.ReplaceComponentList(comp, schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
}, info.Namespace, objs)
if err != nil {
for key := range objs {
cm.errorKeys.put(key.Key(), err.Error())
}
}
for key := range objs {
cm.errorKeys.del(key.Key())
}
return nil
}
}

Expand Down Expand Up @@ -570,10 +599,11 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ

err = cm.storeObjectWithKey(key, obj)
if err != nil {
cm.errorKeys.put(key.Key(), err.Error())
klog.Errorf("could not store object %s, %v", key.Key(), err)
return err
}

cm.errorKeys.del(key.Key())
return cm.updateInMemoryCache(ctx, info, obj)
}

Expand Down Expand Up @@ -611,19 +641,19 @@ func (cm *cacheManager) storeObjectWithKey(key storage.Key, obj runtime.Object)
newRvUint, _ := strconv.ParseUint(newRv, 10, 64)
_, err = cm.storage.Update(key, obj, newRvUint)

switch err {
case nil:
switch {
case err == nil:
return nil
case storage.ErrStorageNotFound:
case errors.Is(err, storage.ErrStorageNotFound):
klog.V(4).Infof("find no cached obj of key: %s, create it with the coming obj with rv: %s", key.Key(), newRv)
if err := cm.storage.Create(key, obj); err != nil {
if err == storage.ErrStorageAccessConflict {
if errors.Is(err, storage.ErrStorageAccessConflict) {
klog.V(2).Infof("skip to cache obj because key(%s) is under processing", key.Key())
return nil
}
return fmt.Errorf("could not create obj of key: %s, %v", key.Key(), err)
}
case storage.ErrStorageAccessConflict:
case errors.Is(err, storage.ErrStorageAccessConflict):
klog.V(2).Infof("skip to cache watch event because key(%s) is under processing", key.Key())
return nil
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2292,7 +2292,7 @@ func TestQueryCacheForGet(t *testing.T) {
// if err != nil {
// t.Errorf("failed to create RESTMapper manager, %v", err)
// }
// yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
// yurtCM := NewCacheManager(fakeClient, sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

// testcases := map[string]struct {
// path string
Expand Down
Loading
Loading