diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 9aff0e37f3e..55eb6c1ddd9 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -96,8 +96,8 @@ func NewCacheManager( restMapperManager: restMapperMgr, listSelectorCollector: make(map[storage.Key]string), inMemoryCache: make(map[string]runtime.Object), + errorKeys: NewErrorKeys(), } - cm.errorKeys = NewErrorKeys() cm.errorKeys.recover() return cm } diff --git a/pkg/yurthub/cachemanager/error_keys.go b/pkg/yurthub/cachemanager/error_keys.go index b8240ea85a9..d736bc3e2d5 100644 --- a/pkg/yurthub/cachemanager/error_keys.go +++ b/pkg/yurthub/cachemanager/error_keys.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" ) @@ -33,6 +34,10 @@ const ( AOFPrefix = "/tmp/errorkeys" ) +var ( + CompressThresh = 50 +) + type errorKeys struct { sync.RWMutex keys map[string]string @@ -45,19 +50,20 @@ type errorKeys struct { func NewErrorKeys() *errorKeys { ek := &errorKeys{ keys: make(map[string]string), - operations: make(chan operation, 10), + operations: make(chan operation, 100), } err := os.MkdirAll(AOFPrefix, 0755) if err != nil { klog.Errorf("failed to create dir: %v", err) + return ek } file, err := os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_CREATE|os.O_RDWR, 0600) if err != nil { - klog.Error("failed to open file, persistency is disabled: %v", err) + klog.Errorf("failed to open file, persistency is disabled: %v", err) return ek } ek.file = file - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.TODO()) ek.cancel = cancel go ek.sync(ctx) go ek.compress(ctx) @@ -81,14 +87,24 @@ func (ek *errorKeys) put(key string, val string) { ek.Lock() defer ek.Unlock() ek.keys[key] = val - ek.operations <- operation{Operator: PUT, Key: key, Val: val} + select { + case ek.operations <- operation{Operator: PUT, Key: key, Val: val}: + klog.Warningf("failed to cache key %s", key) + default: + klog.Errorf("failed to persist error keys %s, channel is full", key) + } } func (ek *errorKeys) del(key string) { ek.Lock() defer ek.Unlock() delete(ek.keys, key) - ek.operations <- operation{Operator: DEL, Key: key} + select { + case ek.operations <- operation{Operator: DEL, Key: key}: + klog.Infof("delete error key %s successfully", key) + default: + klog.Errorf("failed to delete error keys %s", key) + } } func (ek *errorKeys) aggregate() string { @@ -127,11 +143,11 @@ func (ek *errorKeys) sync(ctx context.Context) { } func (ek *errorKeys) compress(ctx context.Context) { - ticker := time.NewTicker(10 * time.Minute) + ticker := time.NewTicker(10 * time.Second) for { select { case <-ticker.C: - if ek.count > 100 { + if ek.count > CompressThresh { ek.rewrite() } case <-ctx.Done(): @@ -143,6 +159,7 @@ func (ek *errorKeys) compress(ctx context.Context) { func (ek *errorKeys) rewrite() { ek.RLock() defer ek.RUnlock() + count := 0 file, err := os.OpenFile(filepath.Join(AOFPrefix, "tmp_aof"), os.O_CREATE|os.O_RDWR, 0600) if err != nil { klog.Errorf("failed to open file: %v", err) @@ -159,15 +176,24 @@ func (ek *errorKeys) rewrite() { return } file.Write(append(data, '\n')) + count++ } file.Sync() file.Close() ek.file.Close() - err = os.Rename(filepath.Join(AOFPrefix, "aof"), filepath.Join(AOFPrefix, "tmp_aof")) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, + func(ctx context.Context) (bool, error) { + if len(ek.operations) == 0 { + return true, nil + } + return false, nil + }) + err = os.Rename(filepath.Join(AOFPrefix, "tmp_aof"), filepath.Join(AOFPrefix, "aof")) if err != nil { - ek.cancel() - return + klog.Errorf("failed to rename tmp_aof to aof, %v", err) } file, err = os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_RDWR, 0600) if err != nil { @@ -175,7 +201,7 @@ func (ek *errorKeys) rewrite() { return } ek.file = file - ek.count = 0 + ek.count = count } func (ek *errorKeys) recover() { diff --git a/pkg/yurthub/cachemanager/error_keys_test.go b/pkg/yurthub/cachemanager/error_keys_test.go index 56ed58109b9..938d8ae3caf 100644 --- a/pkg/yurthub/cachemanager/error_keys_test.go +++ b/pkg/yurthub/cachemanager/error_keys_test.go @@ -17,12 +17,17 @@ limitations under the License. package cachemanager import ( + "context" "encoding/json" "errors" + "fmt" "os" "path/filepath" "strings" "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" ) func TestXxx(t *testing.T) { @@ -102,3 +107,26 @@ func TestRecover(t *testing.T) { ek.cancel() os.RemoveAll(AOFPrefix) } + +func TestCompress(t *testing.T) { + keys := NewErrorKeys() + for i := 0; i < 100; i++ { + keys.put(fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i)) + } + for i := 0; i < 50; i++ { + keys.del(fmt.Sprintf("key-%d", i)) + } + for i := 0; i < 50; i++ { + keys.put(fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i)) + } + err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, false, + func(ctx context.Context) (bool, error) { + if keys.count == 100 { + return true, nil + } + return false, nil + }) + if err != nil { + t.Errorf("failed to sync") + } +}