Skip to content

Commit

Permalink
test: unit test for compress
Browse files Browse the repository at this point in the history
  • Loading branch information
vie-serendipity committed Jun 4, 2024
1 parent 7b6b0a9 commit b31d834
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func NewCacheManager(
restMapperManager: restMapperMgr,
listSelectorCollector: make(map[storage.Key]string),
inMemoryCache: make(map[string]runtime.Object),
errorKeys: NewErrorKeys(),
}
cm.errorKeys = NewErrorKeys()
cm.errorKeys.recover()
return cm
}
Expand Down
48 changes: 37 additions & 11 deletions pkg/yurthub/cachemanager/error_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)

const (
AOFPrefix = "/tmp/errorkeys"
)

var (
CompressThresh = 50
)

type errorKeys struct {
sync.RWMutex
keys map[string]string
Expand All @@ -45,19 +50,20 @@ type errorKeys struct {
func NewErrorKeys() *errorKeys {
ek := &errorKeys{
keys: make(map[string]string),
operations: make(chan operation, 10),
operations: make(chan operation, 100),
}
err := os.MkdirAll(AOFPrefix, 0755)
if err != nil {
klog.Errorf("failed to create dir: %v", err)
return ek

Check warning on line 58 in pkg/yurthub/cachemanager/error_keys.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/error_keys.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}
file, err := os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
klog.Error("failed to open file, persistency is disabled: %v", err)
klog.Errorf("failed to open file, persistency is disabled: %v", err)
return ek

Check warning on line 63 in pkg/yurthub/cachemanager/error_keys.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/error_keys.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}
ek.file = file
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.TODO())
ek.cancel = cancel
go ek.sync(ctx)
go ek.compress(ctx)
Expand All @@ -81,14 +87,24 @@ func (ek *errorKeys) put(key string, val string) {
ek.Lock()
defer ek.Unlock()
ek.keys[key] = val
ek.operations <- operation{Operator: PUT, Key: key, Val: val}
select {
case ek.operations <- operation{Operator: PUT, Key: key, Val: val}:
klog.Warningf("failed to cache key %s", key)
default:
klog.Errorf("failed to persist error keys %s, channel is full", key)
}
}

func (ek *errorKeys) del(key string) {
ek.Lock()
defer ek.Unlock()
delete(ek.keys, key)
ek.operations <- operation{Operator: DEL, Key: key}
select {
case ek.operations <- operation{Operator: DEL, Key: key}:
klog.Infof("delete error key %s successfully", key)
default:
klog.Errorf("failed to delete error keys %s", key)
}
}

func (ek *errorKeys) aggregate() string {
Expand Down Expand Up @@ -127,11 +143,11 @@ func (ek *errorKeys) sync(ctx context.Context) {
}

func (ek *errorKeys) compress(ctx context.Context) {
ticker := time.NewTicker(10 * time.Minute)
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-ticker.C:
if ek.count > 100 {
if ek.count > CompressThresh {
ek.rewrite()
}
case <-ctx.Done():
Expand All @@ -143,6 +159,7 @@ func (ek *errorKeys) compress(ctx context.Context) {
func (ek *errorKeys) rewrite() {
ek.RLock()
defer ek.RUnlock()
count := 0
file, err := os.OpenFile(filepath.Join(AOFPrefix, "tmp_aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
klog.Errorf("failed to open file: %v", err)
Expand All @@ -159,23 +176,32 @@ func (ek *errorKeys) rewrite() {
return

Check warning on line 176 in pkg/yurthub/cachemanager/error_keys.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/error_keys.go#L176

Added line #L176 was not covered by tests
}
file.Write(append(data, '\n'))
count++
}
file.Sync()
file.Close()
ek.file.Close()

err = os.Rename(filepath.Join(AOFPrefix, "aof"), filepath.Join(AOFPrefix, "tmp_aof"))
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true,
func(ctx context.Context) (bool, error) {
if len(ek.operations) == 0 {
return true, nil
}
return false, nil

Check warning on line 192 in pkg/yurthub/cachemanager/error_keys.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/error_keys.go#L192

Added line #L192 was not covered by tests
})
err = os.Rename(filepath.Join(AOFPrefix, "tmp_aof"), filepath.Join(AOFPrefix, "aof"))
if err != nil {
ek.cancel()
return
klog.Errorf("failed to rename tmp_aof to aof, %v", err)

Check warning on line 196 in pkg/yurthub/cachemanager/error_keys.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/error_keys.go#L196

Added line #L196 was not covered by tests
}
file, err = os.OpenFile(filepath.Join(AOFPrefix, "aof"), os.O_RDWR, 0600)
if err != nil {
ek.cancel()
return

Check warning on line 201 in pkg/yurthub/cachemanager/error_keys.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/cachemanager/error_keys.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}
ek.file = file
ek.count = 0
ek.count = count
}

func (ek *errorKeys) recover() {
Expand Down
28 changes: 28 additions & 0 deletions pkg/yurthub/cachemanager/error_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ limitations under the License.
package cachemanager

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"

"k8s.io/apimachinery/pkg/util/wait"
)

func TestXxx(t *testing.T) {
Expand Down Expand Up @@ -102,3 +107,26 @@ func TestRecover(t *testing.T) {
ek.cancel()
os.RemoveAll(AOFPrefix)
}

func TestCompress(t *testing.T) {
keys := NewErrorKeys()
for i := 0; i < 100; i++ {
keys.put(fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i))
}
for i := 0; i < 50; i++ {
keys.del(fmt.Sprintf("key-%d", i))
}
for i := 0; i < 50; i++ {
keys.put(fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i))
}
err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, false,
func(ctx context.Context) (bool, error) {
if keys.count == 100 {
return true, nil
}
return false, nil
})
if err != nil {
t.Errorf("failed to sync")
}
}

0 comments on commit b31d834

Please sign in to comment.