Skip to content

Commit

Permalink
[patch] remove cornelk/hashmap due to the slow write performance, and…
Browse files Browse the repository at this point in the history
… use customized sync.Map generics version (#132)

* [patch] remove cornelk/hashmap due to the slow write performance, and use original sync.Map generics version

Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango authored Mar 24, 2023
1 parent 0a329cf commit 2fcc8ab
Show file tree
Hide file tree
Showing 5 changed files with 405 additions and 74 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ profile: clean init
mkdir bench
mkdir pprof
\
# go test -count=3 -timeout=30m -run=NONE -bench=BenchmarkChangeOutAllInt_gache -benchmem -o pprof/gache-test.bin -cpuprofile pprof/cpu-gache.out -memprofile pprof/mem-gache.out
go test -count=3 -timeout=30m -run=NONE -bench=BenchmarkGacheSetBigDataWithTTL -benchmem -o pprof/gache-test.bin -cpuprofile pprof/cpu-gache.out -memprofile pprof/mem-gache.out
go tool pprof --svg pprof/gache-test.bin pprof/cpu-gache.out > cpu-gache.svg
go tool pprof --svg pprof/gache-test.bin pprof/mem-gache.out > mem-gache.svg
Expand Down
124 changes: 56 additions & 68 deletions gache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"time"
"unsafe"

"github.com/cornelk/hashmap"
"github.com/kpango/fastime"
"github.com/zeebo/xxh3"
"golang.org/x/sync/singleflight"
)

type (
Expand Down Expand Up @@ -64,10 +62,9 @@ type (
expire int64
l uint64
cancel atomic.Pointer[context.CancelFunc]
expGroup singleflight.Group
expChan chan string
expFunc func(context.Context, string)
shards [slen]*hashmap.Map[string, value[V]]
shards [slen]*Map[string, *value[V]]
}

value[V any] struct {
Expand Down Expand Up @@ -101,23 +98,19 @@ func New[V any](opts ...Option[V]) Gache[V] {
return g
}

func newMap[V any]() (m *hashmap.Map[string, value[V]]) {
m = hashmap.New[string, value[V]]()
m.SetHasher(func(k string) uintptr {
return uintptr(xxh3.HashString(k))
})
return m
func newMap[V any]() (m *Map[string, *value[V]]) {
return new(Map[string, *value[V]])
}

func getShardID(key string) uint64 {
func getShardID(key string) (id uint64) {
if len(key) > 128 {
return xxh3.HashString(key[:128]) & mask
}
return xxh3.HashString(key) & mask
}

// isValid checks expiration of value
func (v *value[V]) isValid() bool {
func (v *value[V]) isValid() (valid bool) {
return v.expire <= 0 || fastime.UnixNanoNow() <= v.expire
}

Expand Down Expand Up @@ -170,8 +163,14 @@ func (g *gache[V]) StartExpired(ctx context.Context, dur time.Duration) Gache[V]
// ToMap returns All Cache Key-Value sync.Map
func (g *gache[V]) ToMap(ctx context.Context) *sync.Map {
m := new(sync.Map)
g.Range(ctx, func(key string, val V, exp int64) bool {
go m.Store(key, val)
var wg sync.WaitGroup
defer wg.Wait()
g.Range(ctx, func(key string, val V, exp int64) (ok bool) {
wg.Add(1)
go func() {
m.Store(key, val)
wg.Done()
}()
return true
})

Expand All @@ -182,7 +181,7 @@ func (g *gache[V]) ToMap(ctx context.Context) *sync.Map {
func (g *gache[V]) ToRawMap(ctx context.Context) map[string]V {
m := make(map[string]V, g.Len())
mu := new(sync.Mutex)
g.Range(ctx, func(key string, val V, exp int64) bool {
g.Range(ctx, func(key string, val V, exp int64) (ok bool) {
mu.Lock()
m[key] = val
mu.Unlock()
Expand All @@ -192,30 +191,29 @@ func (g *gache[V]) ToRawMap(ctx context.Context) map[string]V {
}

// get returns value & exists from key
func (g *gache[V]) get(key string) (V, int64, bool) {
var val V
v, ok := g.shards[getShardID(key)].Get(key)
func (g *gache[V]) get(key string) (v V, expire int64, ok bool) {
var val *value[V]
val, ok = g.shards[getShardID(key)].Load(key)
if !ok {
return val, 0, false
return v, 0, false
}

if v.isValid() {
val = v.val
return val, v.expire, true
if val.isValid() {
return val.val, val.expire, true
}

g.expiration(key)
return val, v.expire, false
return v, val.expire, false
}

// Get returns value & exists from key
func (g *gache[V]) Get(key string) (V, bool) {
v, _, ok := g.get(key)
func (g *gache[V]) Get(key string) (v V, ok bool) {
v, _, ok = g.get(key)
return v, ok
}

// GetWithExpire returns value & expire & exists from key
func (g *gache[V]) GetWithExpire(key string) (V, int64, bool) {
func (g *gache[V]) GetWithExpire(key string) (v V, expire int64, ok bool) {
return g.get(key)
}

Expand All @@ -224,11 +222,13 @@ func (g *gache[V]) set(key string, val V, expire int64) {
if expire > 0 {
expire = fastime.UnixNanoNow() + expire
}
atomic.AddUint64(&g.l, 1)
g.shards[getShardID(key)].Set(key, value[V]{
_, loaded := g.shards[getShardID(key)].Swap(key, &value[V]{
expire: expire,
val: val,
})
if !loaded {
atomic.AddUint64(&g.l, 1)
}
}

// SetWithExpire sets key-value & expiration to Gache
Expand All @@ -243,41 +243,39 @@ func (g *gache[V]) Set(key string, val V) {

// Delete deletes value from Gache using key
func (g *gache[V]) Delete(key string) (loaded bool) {
atomic.AddUint64(&g.l, ^uint64(0))
return g.shards[getShardID(key)].Del(key)
_, loaded = g.shards[getShardID(key)].LoadAndDelete(key)
if loaded {
atomic.AddUint64(&g.l, ^uint64(0))
}
return
}

func (g *gache[V]) expiration(key string) {
g.expGroup.Do(key, func() (interface{}, error) {
g.Delete(key)
if g.expFuncEnabled {
g.expChan <- key
}
return nil, nil
})
g.Delete(key)
if g.expFuncEnabled {
g.expChan <- key
}
}

// DeleteExpired deletes expired value from Gache it can be cancel using context
func (g *gache[V]) DeleteExpired(ctx context.Context) uint64 {
wg := new(sync.WaitGroup)
var rows uint64
func (g *gache[V]) DeleteExpired(ctx context.Context) (rows uint64) {
var wg sync.WaitGroup
for i := range g.shards {
wg.Add(1)
go func(c context.Context, idx int) {
g.shards[idx].Range(func(k string, v value[V]) bool {
select {
case <-c.Done():
return false
default:
defer wg.Done()
select {
case <-c.Done():
return
default:
g.shards[idx].Range(func(k string, v *value[V]) (ok bool) {
if !v.isValid() {
g.expiration(k)
atomic.AddUint64(&rows, 1)
runtime.Gosched()
}
return true
}
})
wg.Done()
})
}
}(ctx, i)
}
wg.Wait()
Expand All @@ -290,20 +288,19 @@ func (g *gache[V]) Range(ctx context.Context, f func(string, V, int64) bool) Gac
for i := range g.shards {
wg.Add(1)
go func(c context.Context, idx int) {
g.shards[idx].Range(func(k string, v value[V]) bool {
select {
case <-c.Done():
return false
default:
defer wg.Done()
select {
case <-c.Done():
return
default:
g.shards[idx].Range(func(k string, v *value[V]) (ok bool) {
if v.isValid() {
return f(k, v.val, v.expire)
}
runtime.Gosched()
g.expiration(k)
return true
}
})
wg.Done()
})
}
}(ctx, i)
}
wg.Wait()
Expand All @@ -318,16 +315,7 @@ func (g *gache[V]) Len() int {

// Write writes all cached data to writer
func (g *gache[V]) Write(ctx context.Context, w io.Writer) error {
mu := new(sync.Mutex)
m := make(map[string]V, g.Len())

g.Range(ctx, func(key string, val V, exp int64) bool {
gob.Register(val)
mu.Lock()
m[key] = val
mu.Unlock()
return true
})
m := g.ToRawMap(ctx)
gob.Register(map[string]V{})
return gob.NewEncoder(w).Encode(&m)
}
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ module github.com/kpango/gache/v2
go 1.20

require (
github.com/cornelk/hashmap v1.0.8
github.com/kpango/fastime v1.1.9
github.com/kpango/glg v1.6.15
github.com/zeebo/xxh3 v1.0.2
golang.org/x/sync v0.1.0
)

require (
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc=
github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
Expand All @@ -15,6 +13,4 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
Loading

0 comments on commit 2fcc8ab

Please sign in to comment.