Skip to content

Commit

Permalink
feat: add rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
vie-serendipity committed May 10, 2024
1 parent d4cb796 commit 07c4d5e
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 31 deletions.
14 changes: 6 additions & 8 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -78,7 +79,7 @@ type cacheManager struct {
listSelectorCollector map[storage.Key]string
inMemoryCache map[string]runtime.Object
errorKeys *errorKeys
failedNumber int
failedNumber *int32
}

// NewCacheManager creates a new CacheManager
Expand All @@ -96,11 +97,8 @@ func NewCacheManager(
restMapperManager: restMapperMgr,
listSelectorCollector: make(map[storage.Key]string),
inMemoryCache: make(map[string]runtime.Object),
errorKeys: &errorKeys{
keys: make(map[string]string),
operations: make(chan operation, 100),
},
}
cm.errorKeys = NewErrorKeys()
cm.errorKeys.recover()
return cm
}
Expand Down Expand Up @@ -251,10 +249,10 @@ func (cm *cacheManager) updateNodeStatus(req *http.Request) (runtime.Object, err
}
if cm.errorKeys.length() == 0 {
setNodeAutonomyCondition(node, v1.ConditionTrue, "cache successful", "The autonomy is enabled and it works fine")
cm.failedNumber = 0
atomic.StoreInt32(cm.failedNumber, 0)
} else {
cm.failedNumber++
if cm.failedNumber > 3 {
atomic.AddInt32(cm.failedNumber, 1)
if *cm.failedNumber > 3 {
setNodeAutonomyCondition(node, v1.ConditionUnknown, "cache failed", cm.errorKeys.aggregate())
}
}
Expand Down
98 changes: 83 additions & 15 deletions pkg/yurthub/cachemanager/error_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"k8s.io/klog/v2"
)
Expand All @@ -36,14 +37,30 @@ type errorKeys struct {
sync.RWMutex
keys map[string]string
operations chan operation
file *os.File
count int
cancel context.CancelFunc
}

func NewErrorKeys() *errorKeys {
ek := &errorKeys{
keys: make(map[string]string),
operations: make(chan operation, 10),
}
go ek.sync(context.Background())
err := os.MkdirAll(AOFPrefix, 0755)
if err != nil {
klog.Errorf("failed to create dir: %v", err)
}
file, err := os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
klog.Error("failed to open file, persistency is disabled: %v", err)
return ek
}
ek.file = file
ctx, cancel := context.WithCancel(context.Background())
ek.cancel = cancel
go ek.sync(ctx)
go ek.compress(ctx)
return ek
}

Expand Down Expand Up @@ -71,7 +88,7 @@ func (ek *errorKeys) del(key string) {
ek.Lock()
defer ek.Unlock()
delete(ek.keys, key)
ek.operations <- operation{Operator: PUT, Key: key}
ek.operations <- operation{Operator: DEL, Key: key}
}

func (ek *errorKeys) aggregate() string {
Expand All @@ -92,35 +109,86 @@ func (ek *errorKeys) length() int {
}

func (ek *errorKeys) sync(ctx context.Context) {
err := os.MkdirAll(AOFPrefix, 0755)
if err != nil {
klog.Errorf("failed to create dir: %v", err)
}
file, err := os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
klog.Errorf("failed to open file %v", err)
}
defer file.Close()
for {
select {
case op := <-ek.operations:
data, err := json.Marshal(op)
if err != nil {
klog.Errorf("failed to serialize operation: %v", op)
}
file.Write(append(data, '\n'))
file.Sync()
ek.file.Write(append(data, '\n'))
ek.file.Sync()
ek.count++
case <-ctx.Done():
ek.file.Close()
return
}
}
}

func (ek *errorKeys) recover() {
file, err := os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_RDWR, 0600)
func (ek *errorKeys) compress(ctx context.Context) {
ticker := time.NewTicker(10 * time.Minute)
for {
select {
case <-ticker.C:
if ek.count > 100 {
ek.rewrite()
}
case <-ctx.Done():
return
}
}
}

func (ek *errorKeys) rewrite() {
ek.RLock()
defer ek.RUnlock()
file, err := os.OpenFile(filepath.Join(AOFPrefix, "tmp_aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
klog.Errorf("failed to open file: %v", err)
return
}
for key, val := range ek.keys {
op := operation{
Key: key,
Val: val,
Operator: PUT,
}
data, err := json.Marshal(op)
if err != nil {
return
}
file.Write(append(data, '\n'))
}
file.Sync()
file.Close()
ek.file.Close()

err = os.Rename(filepath.Join(AOFPrefix, "aof"), filepath.Join(AOFPrefix, "tmp_aof"))
if err != nil {
ek.cancel()
return
}
file, err = os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_RDWR, 0600)
if err != nil {
ek.cancel()
return
}
ek.file = file
ek.count = 0
}

func (ek *errorKeys) recover() {
var file *os.File
var err error
if ek.file == nil {
file, err = os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_RDWR, 0600)
if err != nil {
return
}
} else {
file = ek.file
}
scanner := bufio.NewScanner(file)
var operations []operation
for scanner.Scan() {
Expand Down
39 changes: 31 additions & 8 deletions pkg/yurthub/cachemanager/error_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package cachemanager

import (
"context"
"encoding/json"
"errors"
"os"
"path/filepath"
"testing"
)

Expand Down Expand Up @@ -49,12 +50,7 @@ func TestXxx(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
ek := &errorKeys{
keys: make(map[string]string),
operations: make(chan operation, 100),
}
ctx, cancel := context.WithCancel(context.TODO())
go ek.sync(ctx)
ek := NewErrorKeys()
for i := range tc.keys {
ek.put(tc.keys[i], tc.err[i])
}
Expand All @@ -70,8 +66,35 @@ func TestXxx(t *testing.T) {
if ek.length() != 0 {
t.Errorf("expect length %v, got %v", tc.length, ek.length())
}
cancel()
ek.cancel()
os.RemoveAll(AOFPrefix)
})
}
}

func TestRecover(t *testing.T) {
op := operation{
Key: "kubelet",
Val: "fail to xxx",
Operator: PUT,
}
data, err := json.Marshal(op)
if err != nil {
t.Errorf("failed to marshal: %v", err)
}
os.MkdirAll(AOFPrefix, 0755)
file, err := os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
t.Errorf("failed to open file: %v", err)
}
file.Write(data)
file.Sync()
file.Close()
ek := NewErrorKeys()
ek.recover()
if _, ok := ek.keys[op.Key]; !ok {
t.Errorf("failed to recover")
}
ek.cancel()
os.RemoveAll(AOFPrefix)
}

0 comments on commit 07c4d5e

Please sign in to comment.