Skip to content

Commit

Permalink
feat(freecache): race问题 (#858)
Browse files Browse the repository at this point in the history
Co-authored-by: 彭业昌 <refrain@douyu.tv>
  • Loading branch information
PengYechang and 彭业昌 committed May 9, 2023
1 parent fc7ad03 commit 04215b6
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 37 deletions.
3 changes: 1 addition & 2 deletions pkg/cache/xfreecache/v2/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ func (c *cache[K, V]) GetAndSetCacheMap(key string, ids []K, fn func([]K) (map[K
// id去重
ids = lo.Uniq(ids)
idsNone := make([]K, 0, len(ids))
pool := getPool[V]()
for _, id := range ids {
cacheKey := c.getKey(key, id)
resT, innerErr := c.GetCacheData(cacheKey)
if innerErr == nil && resT != nil {
var value V
// 反序列化
value, innerErr = unmarshalWithPool[V](resT, pool)
value, innerErr = unmarshal[V](resT)
if innerErr != nil {
xlog.Jupiter().Error("cache unmarshalWithPool", zap.String("key", key), zap.Error(err))
} else {
Expand Down
60 changes: 60 additions & 0 deletions pkg/cache/xfreecache/v2/cache_bench_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package xfreecache

import (
helloworldv1 "github.com/douyu/jupiter/proto/helloworld/v1"
"golang.org/x/sync/errgroup"
"strconv"
"testing"
)

// BenchmarkLocalCache_GetCacheData race检测
func BenchmarkLocalCache_GetCacheData(b *testing.B) {
localCache := New[string, Student](DefaultConfig())

Expand All @@ -27,6 +30,63 @@ func BenchmarkLocalCache_GetCacheData(b *testing.B) {
})
}
})

b.Run("read & write & race", func(b *testing.B) {
eg := errgroup.Group{}
for i := 0; i < b.N; i++ {
eg.Go(func() error {
student := Student{10, "student" + strconv.Itoa(i)}
_, _ = localCache.GetAndSetCacheData("mytest", student.Name, func() (Student, error) {
res := student
return res, nil
})
return nil
})
}
_ = eg.Wait()
})
}

// BenchmarkLocalCache_GetCacheData_Proto race检测
func BenchmarkLocalCache_GetCacheData_Proto(b *testing.B) {
localCache := New[int, *helloworldv1.SayHiResponse](DefaultConfig())
b.Run("read", func(b *testing.B) {
for i := 0; i < b.N; i++ {
student := &helloworldv1.SayHiResponse{Error: uint32(i)}
_, _ = localCache.GetAndSetCacheData("mytest", i, func() (*helloworldv1.SayHiResponse, error) {
res := student
return res, nil
})
}
})

b.Run("read & write & race", func(b *testing.B) {
for i := 0; i < b.N; i++ {
student := &helloworldv1.SayHiResponse{Error: uint32(1)}
data, _ := localCache.GetAndSetCacheData("mytest", 1, func() (*helloworldv1.SayHiResponse, error) {
res := student
return res, nil
})
_ = data.Data
}
})

b.Run("read & write & race", func(b *testing.B) {
eg := errgroup.Group{}
for i := 0; i < b.N; i++ {
eg.Go(func() error {
student := &helloworldv1.SayHiResponse{Error: uint32(1)}
data, _ := localCache.GetAndSetCacheData("mytest", 1, func() (*helloworldv1.SayHiResponse, error) {
res := student
return res, nil
})
_ = data.Data

return nil
})
}
_ = eg.Wait()
})
}

func BenchmarkLocalCache_GetCacheMap(b *testing.B) {
Expand Down
35 changes: 7 additions & 28 deletions pkg/cache/xfreecache/v2/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package xfreecache

import (
"encoding/json"
"reflect"
"sync"

"google.golang.org/protobuf/proto"
"reflect"
)

// 序列化,如果是pb格式,则使用proto序列化
Expand All @@ -18,36 +16,17 @@ func marshal[T any](cacheData T) (data []byte, err error) {
return
}

var pools sync.Map

func getPool[T any]() *sync.Pool {
var value T
if msg, ok := any(value).(proto.Message); ok {
// 反序列化,如果是pb格式,则使用proto序列化
func unmarshal[T any](body []byte) (value T, err error) {
if msg, ok := any(value).(proto.Message); ok { // Constrained to proto.Message
// Peek the type inside T (as T= *SomeProtoMsgType)
msgType := reflect.TypeOf(msg).Elem()
if pool, ok2 := pools.Load(msgType.String()); ok2 {
return pool.(*sync.Pool)
}

pool := &sync.Pool{
New: func() any {
// Make a new one, and throw it back into T
msg = reflect.New(msgType).Interface().(proto.Message)
return msg
},
}
pools.Store(msgType.String(), pool)
return pool
}
return nil
}
// Make a new one, and throw it back into T
msg = reflect.New(msgType).Interface().(proto.Message)

// 反序列化,如果是pb格式,则使用proto序列化 使用sync.Pool
func unmarshalWithPool[T any](body []byte, pool *sync.Pool) (value T, err error) {
if _, ok := any(value).(proto.Message); ok { // Constrained to proto.Message
msg := pool.Get().(proto.Message)
err = proto.Unmarshal(body, msg)
value = msg.(T)
pool.Put(msg)
} else {
err = json.Unmarshal(body, &value)
}
Expand Down
34 changes: 27 additions & 7 deletions pkg/cache/xfreecache/v2/encode_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package xfreecache
import (
"encoding/json"
"reflect"
"sync"
"testing"

helloworldv1 "github.com/douyu/jupiter/proto/helloworld/v1"
Expand All @@ -20,7 +21,7 @@ var helloReply = &helloworldv1.SayHiResponse{
}

/*
encoding/json
encoding/json
*/
func BenchmarkDecodeStdStructMedium(b *testing.B) {
res, _ := json.Marshal(helloReply)
Expand Down Expand Up @@ -94,17 +95,36 @@ func BenchmarkDecodeProtoWithReflectAndPool(b *testing.B) {
}
}

// 反序列化,如果是pb格式,则使用proto序列化
func unmarshal[T any](body []byte) (value T, err error) {
if msg, ok := any(value).(proto.Message); ok { // Constrained to proto.Message
// Peek the type inside T (as T= *SomeProtoMsgType)
var pools sync.Map

func getPool[T any]() *sync.Pool {
var value T
if msg, ok := any(value).(proto.Message); ok {
msgType := reflect.TypeOf(msg).Elem()
if pool, ok2 := pools.Load(msgType.String()); ok2 {
return pool.(*sync.Pool)
}

// Make a new one, and throw it back into T
msg = reflect.New(msgType).Interface().(proto.Message)
pool := &sync.Pool{
New: func() any {
// Make a new one, and throw it back into T
msgN := reflect.New(msgType).Interface().(proto.Message)
return msgN
},
}
pools.Store(msgType.String(), pool)
return pool
}
return nil
}

// 反序列化,如果是pb格式,则使用proto序列化 使用sync.Pool-存在并发问题
func unmarshalWithPool[T any](body []byte, pool *sync.Pool) (value T, err error) {
if _, ok := any(value).(proto.Message); ok { // Constrained to proto.Message
msg := pool.Get().(proto.Message)
err = proto.Unmarshal(body, msg)
value = msg.(T)
pool.Put(msg)
} else {
err = json.Unmarshal(body, &value)
}
Expand Down

0 comments on commit 04215b6

Please sign in to comment.