Skip to content

Commit

Permalink
use xsync.Map in cachekv
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Mar 17, 2024
1 parent 934fe0c commit e6a0af3
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 65 deletions.
13 changes: 7 additions & 6 deletions store/cachekv/search_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"cosmossdk.io/store/cachekv/internal"
"github.com/puzpuzpuz/xsync/v3"
)

func BenchmarkLargeUnsortedMisses(b *testing.B) {
Expand All @@ -22,18 +23,18 @@ func BenchmarkLargeUnsortedMisses(b *testing.B) {
}

func generateStore() *Store {
cache := map[string]*cValue{}
unsorted := map[string]struct{}{}
cache := xsync.NewMap()
unsorted := xsync.NewMap()
for i := 0; i < 5000; i++ {
key := "A" + strconv.Itoa(i)
unsorted[key] = struct{}{}
cache[key] = &cValue{}
unsorted.Store(key, struct{}{})
cache.Store(key, &cValue{})
}

for i := 0; i < 5000; i++ {
key := "Z" + strconv.Itoa(i)
unsorted[key] = struct{}{}
cache[key] = &cValue{}
unsorted.Store(key, struct{}{})
cache.Store(key, &cValue{})
}

return &Store{
Expand Down
100 changes: 41 additions & 59 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"bytes"
"io"
"sort"
"sync"

dbm "github.com/cosmos/cosmos-db"
"github.com/puzpuzpuz/xsync/v3"

"cosmossdk.io/math"
"cosmossdk.io/store/cachekv/internal"
Expand All @@ -25,9 +25,8 @@ type cValue struct {

// Store wraps an in-memory cache around an underlying types.KVStore.
type Store struct {
mtx sync.Mutex
cache map[string]*cValue
unsortedCache map[string]struct{}
cache *xsync.Map
unsortedCache *xsync.Map
sortedCache internal.BTree // always ascending sorted
parent types.KVStore
}
Expand All @@ -37,8 +36,8 @@ var _ types.CacheKVStore = (*Store)(nil)
// NewStore creates a new Store object
func NewStore(parent types.KVStore) *Store {
return &Store{
cache: make(map[string]*cValue),
unsortedCache: make(map[string]struct{}),
cache: xsync.NewMap(),
unsortedCache: xsync.NewMap(),
sortedCache: internal.NewBTree(),
parent: parent,
}
Expand All @@ -51,17 +50,14 @@ func (store *Store) GetStoreType() types.StoreType {

// Get implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
store.mtx.Lock()
defer store.mtx.Unlock()

types.AssertValidKey(key)

cacheValue, ok := store.cache[conv.UnsafeBytesToStr(key)]
cacheValue, ok := store.cache.Load(conv.UnsafeBytesToStr(key))
if !ok {
value = store.parent.Get(key)
store.setCacheValue(key, value, false)
} else {
value = cacheValue.value
value = cacheValue.(*cValue).value
}

return value
Expand All @@ -71,9 +67,6 @@ func (store *Store) Get(key []byte) (value []byte) {
func (store *Store) Set(key, value []byte) {
types.AssertValidKey(key)
types.AssertValidValue(value)

store.mtx.Lock()
defer store.mtx.Unlock()
store.setCacheValue(key, value, true)
}

Expand All @@ -86,41 +79,27 @@ func (store *Store) Has(key []byte) bool {
// Delete implements types.KVStore.
func (store *Store) Delete(key []byte) {
types.AssertValidKey(key)

store.mtx.Lock()
defer store.mtx.Unlock()

store.setCacheValue(key, nil, true)
}

func (store *Store) resetCaches() {
if len(store.cache) > 100_000 {
if store.cache.Size() > 100_000 {
// Cache is too large. We likely did something linear time
// (e.g. Epoch block, Genesis block, etc). Free the old caches from memory, and let them get re-allocated.
// TODO: In a future CacheKV redesign, such linear workloads should get into a different cache instantiation.
// 100_000 is arbitrarily chosen as it solved Osmosis' InitGenesis RAM problem.
store.cache = make(map[string]*cValue)
store.unsortedCache = make(map[string]struct{})
store.cache.Clear()
store.unsortedCache.Clear()
} else {
// Clear the cache using the map clearing idiom
// and not allocating fresh objects.
// Please see https://bencher.orijtech.com/perfclinic/mapclearing/
for key := range store.cache {
delete(store.cache, key)
}
for key := range store.unsortedCache {
delete(store.unsortedCache, key)
}
store.cache.Clear()
store.unsortedCache.Clear()
}
store.sortedCache = internal.NewBTree()
}

// Implements Cachetypes.KVStore.
func (store *Store) Write() {
store.mtx.Lock()
defer store.mtx.Unlock()

if len(store.cache) == 0 && len(store.unsortedCache) == 0 {
if store.cache.Size() == 0 && store.unsortedCache.Size() == 0 {
store.sortedCache = internal.NewBTree()
return
}
Expand All @@ -133,13 +112,15 @@ func (store *Store) Write() {
// We need a copy of all of the keys.
// Not the best. To reduce RAM pressure, we copy the values as well
// and clear out the old caches right after the copy.
sortedCache := make([]cEntry, 0, len(store.cache))
sortedCache := make([]cEntry, 0, store.cache.Size())

for key, dbValue := range store.cache {
store.cache.Range(func(key string, value interface{}) bool {
dbValue := value.(*cValue)
if dbValue.dirty {
sortedCache = append(sortedCache, cEntry{key, dbValue})
}
}
return true
})
store.resetCaches()
sort.Slice(sortedCache, func(i, j int) bool {
return sortedCache[i].key < sortedCache[j].key
Expand Down Expand Up @@ -185,9 +166,6 @@ func (store *Store) ReverseIterator(start, end []byte) types.Iterator {
}

func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
store.mtx.Lock()
defer store.mtx.Unlock()

store.dirtyItems(start, end)
isoSortedCache := store.sortedCache.Copy()

Expand Down Expand Up @@ -300,7 +278,7 @@ func (store *Store) dirtyItems(start, end []byte) {
return
}

n := len(store.unsortedCache)
n := store.unsortedCache.Size()
unsorted := make([]*kv.Pair, 0)
// If the unsortedCache is too big, its costs too much to determine
// whats in the subset we are concerned about.
Expand All @@ -309,23 +287,25 @@ func (store *Store) dirtyItems(start, end []byte) {
// Even without that, too many range checks eventually becomes more expensive
// than just not having the cache.
if n < minSortSize {
for key := range store.unsortedCache {
store.unsortedCache.Range(func(key string, _ interface{}) bool {
// dbm.IsKeyInDomain is nil safe and returns true iff key is greater than start
if dbm.IsKeyInDomain(conv.UnsafeStrToBytes(key), start, end) {
cacheValue := store.cache[key]
unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.value})
cacheValue, _ := store.cache.Load(key)
unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.(*cValue).value})
}
}
return true
})
store.clearUnsortedCacheSubset(unsorted, stateUnsorted)
return
}

// Otherwise it is large so perform a modified binary search to find
// the target ranges for the keys that we should be looking for.
strL := make([]string, 0, n)
for key := range store.unsortedCache {
store.unsortedCache.Range(func(key string, _ interface{}) bool {
strL = append(strL, key)
}
return true
})
sort.Strings(strL)

// Now find the values within the domain
Expand Down Expand Up @@ -359,23 +339,21 @@ func (store *Store) dirtyItems(start, end []byte) {
kvL := make([]*kv.Pair, 0, 1+endIndex-startIndex)
for i := startIndex; i <= endIndex; i++ {
key := strL[i]
cacheValue := store.cache[key]
kvL = append(kvL, &kv.Pair{Key: []byte(key), Value: cacheValue.value})
cacheValue, _ := store.cache.Load(key)
kvL = append(kvL, &kv.Pair{Key: []byte(key), Value: cacheValue.(*cValue).value})
}

// kvL was already sorted so pass it in as is.
store.clearUnsortedCacheSubset(kvL, stateAlreadySorted)
}

func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sortState) {
n := len(store.unsortedCache)
n := store.unsortedCache.Size()
if len(unsorted) == n { // This pattern allows the Go compiler to emit the map clearing idiom for the entire map.
for key := range store.unsortedCache {
delete(store.unsortedCache, key)
}
store.unsortedCache.Clear()
} else { // Otherwise, normally delete the unsorted keys from the map.
for _, kv := range unsorted {
delete(store.unsortedCache, conv.UnsafeBytesToStr(kv.Key))
store.unsortedCache.Delete(conv.UnsafeBytesToStr(kv.Key))
}
}

Expand All @@ -398,11 +376,15 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort
// A `nil` value means a deletion.
func (store *Store) setCacheValue(key, value []byte, dirty bool) {
keyStr := conv.UnsafeBytesToStr(key)
store.cache[keyStr] = &cValue{
if !dirty {
// set if not exists
store.cache.LoadOrStore(keyStr, &cValue{value: value})
}

// force update
store.cache.Store(keyStr, &cValue{
value: value,
dirty: dirty,
}
if dirty {
store.unsortedCache[keyStr] = struct{}{}
}
})
store.unsortedCache.Store(keyStr, struct{}{})
}
2 changes: 2 additions & 0 deletions store/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ require (
gotest.tools/v3 v3.5.1
)

require github.com/puzpuzpuz/xsync/v3 v3.1.0

require (
github.com/DataDog/zstd v1.5.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions store/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4=
github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
Expand Down

0 comments on commit e6a0af3

Please sign in to comment.