Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[patch] remove cornelk/hashmap due to the slow write performance, and use customized sync.Map generics version #132

Merged
merged 2 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ profile: clean init
mkdir bench
mkdir pprof
\
# go test -count=3 -timeout=30m -run=NONE -bench=BenchmarkChangeOutAllInt_gache -benchmem -o pprof/gache-test.bin -cpuprofile pprof/cpu-gache.out -memprofile pprof/mem-gache.out
go test -count=3 -timeout=30m -run=NONE -bench=BenchmarkGacheSetBigDataWithTTL -benchmem -o pprof/gache-test.bin -cpuprofile pprof/cpu-gache.out -memprofile pprof/mem-gache.out
go tool pprof --svg pprof/gache-test.bin pprof/cpu-gache.out > cpu-gache.svg
go tool pprof --svg pprof/gache-test.bin pprof/mem-gache.out > mem-gache.svg
Expand Down
123 changes: 55 additions & 68 deletions gache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"time"
"unsafe"

"github.com/cornelk/hashmap"
"github.com/kpango/fastime"
"github.com/zeebo/xxh3"
"golang.org/x/sync/singleflight"
)

type (
Expand Down Expand Up @@ -64,10 +62,9 @@ type (
expire int64
l uint64
cancel atomic.Pointer[context.CancelFunc]
expGroup singleflight.Group
expChan chan string
expFunc func(context.Context, string)
shards [slen]*hashmap.Map[string, value[V]]
shards [slen]*Map[string, *value[V]]
}

value[V any] struct {
Expand Down Expand Up @@ -101,23 +98,19 @@ func New[V any](opts ...Option[V]) Gache[V] {
return g
}

func newMap[V any]() (m *hashmap.Map[string, value[V]]) {
m = hashmap.New[string, value[V]]()
m.SetHasher(func(k string) uintptr {
return uintptr(xxh3.HashString(k))
})
return m
func newMap[V any]() (m *Map[string, *value[V]]) {
return new(Map[string, *value[V]])
}

func getShardID(key string) uint64 {
func getShardID(key string) (id uint64) {
if len(key) > 128 {
return xxh3.HashString(key[:128]) & mask
}
return xxh3.HashString(key) & mask
}

// isValid checks expiration of value
func (v *value[V]) isValid() bool {
func (v *value[V]) isValid() (valid bool) {
return v.expire <= 0 || fastime.UnixNanoNow() <= v.expire
}

Expand Down Expand Up @@ -170,8 +163,13 @@ func (g *gache[V]) StartExpired(ctx context.Context, dur time.Duration) Gache[V]
// ToMap returns All Cache Key-Value sync.Map
func (g *gache[V]) ToMap(ctx context.Context) *sync.Map {
m := new(sync.Map)
g.Range(ctx, func(key string, val V, exp int64) bool {
go m.Store(key, val)
var wg sync.WaitGroup
kpango marked this conversation as resolved.
Show resolved Hide resolved
g.Range(ctx, func(key string, val V, exp int64) (ok bool) {
wg.Add(1)
go func() {
m.Store(key, val)
wg.Done()
}()
return true
})

Expand All @@ -182,7 +180,7 @@ func (g *gache[V]) ToMap(ctx context.Context) *sync.Map {
func (g *gache[V]) ToRawMap(ctx context.Context) map[string]V {
m := make(map[string]V, g.Len())
mu := new(sync.Mutex)
g.Range(ctx, func(key string, val V, exp int64) bool {
g.Range(ctx, func(key string, val V, exp int64) (ok bool) {
mu.Lock()
m[key] = val
mu.Unlock()
Expand All @@ -192,30 +190,29 @@ func (g *gache[V]) ToRawMap(ctx context.Context) map[string]V {
}

// get returns value & exists from key
func (g *gache[V]) get(key string) (V, int64, bool) {
var val V
v, ok := g.shards[getShardID(key)].Get(key)
func (g *gache[V]) get(key string) (v V, expire int64, ok bool) {
var val *value[V]
val, ok = g.shards[getShardID(key)].Load(key)
if !ok {
return val, 0, false
return v, 0, false
}

if v.isValid() {
val = v.val
return val, v.expire, true
if val.isValid() {
return val.val, val.expire, true
}

g.expiration(key)
return val, v.expire, false
return v, val.expire, false
}

// Get returns value & exists from key
func (g *gache[V]) Get(key string) (V, bool) {
v, _, ok := g.get(key)
func (g *gache[V]) Get(key string) (v V, ok bool) {
v, _, ok = g.get(key)
return v, ok
}

// GetWithExpire returns value & expire & exists from key
func (g *gache[V]) GetWithExpire(key string) (V, int64, bool) {
func (g *gache[V]) GetWithExpire(key string) (v V, expire int64, ok bool) {
return g.get(key)
}

Expand All @@ -224,11 +221,13 @@ func (g *gache[V]) set(key string, val V, expire int64) {
if expire > 0 {
expire = fastime.UnixNanoNow() + expire
}
atomic.AddUint64(&g.l, 1)
g.shards[getShardID(key)].Set(key, value[V]{
_, loaded := g.shards[getShardID(key)].Swap(key, &value[V]{
expire: expire,
val: val,
})
if !loaded {
atomic.AddUint64(&g.l, 1)
}
}

// SetWithExpire sets key-value & expiration to Gache
Expand All @@ -243,41 +242,39 @@ func (g *gache[V]) Set(key string, val V) {

// Delete deletes value from Gache using key
func (g *gache[V]) Delete(key string) (loaded bool) {
atomic.AddUint64(&g.l, ^uint64(0))
return g.shards[getShardID(key)].Del(key)
_, loaded = g.shards[getShardID(key)].LoadAndDelete(key)
if loaded {
atomic.AddUint64(&g.l, ^uint64(0))
}
return
}

func (g *gache[V]) expiration(key string) {
g.expGroup.Do(key, func() (interface{}, error) {
g.Delete(key)
if g.expFuncEnabled {
g.expChan <- key
}
return nil, nil
})
g.Delete(key)
if g.expFuncEnabled {
g.expChan <- key
}
}

// DeleteExpired deletes expired value from Gache it can be cancel using context
func (g *gache[V]) DeleteExpired(ctx context.Context) uint64 {
wg := new(sync.WaitGroup)
var rows uint64
func (g *gache[V]) DeleteExpired(ctx context.Context) (rows uint64) {
var wg sync.WaitGroup
for i := range g.shards {
wg.Add(1)
go func(c context.Context, idx int) {
g.shards[idx].Range(func(k string, v value[V]) bool {
select {
case <-c.Done():
return false
default:
defer wg.Done()
select {
case <-c.Done():
return
default:
g.shards[idx].Range(func(k string, v *value[V]) (ok bool) {
if !v.isValid() {
g.expiration(k)
atomic.AddUint64(&rows, 1)
runtime.Gosched()
}
return true
}
})
wg.Done()
})
}
}(ctx, i)
}
wg.Wait()
Expand All @@ -290,20 +287,19 @@ func (g *gache[V]) Range(ctx context.Context, f func(string, V, int64) bool) Gac
for i := range g.shards {
wg.Add(1)
go func(c context.Context, idx int) {
g.shards[idx].Range(func(k string, v value[V]) bool {
select {
case <-c.Done():
return false
default:
defer wg.Done()
select {
case <-c.Done():
return
default:
g.shards[idx].Range(func(k string, v *value[V]) (ok bool) {
if v.isValid() {
return f(k, v.val, v.expire)
}
runtime.Gosched()
g.expiration(k)
return true
}
})
wg.Done()
})
}
}(ctx, i)
}
wg.Wait()
Expand All @@ -318,16 +314,7 @@ func (g *gache[V]) Len() int {

// Write writes all cached data to writer
func (g *gache[V]) Write(ctx context.Context, w io.Writer) error {
mu := new(sync.Mutex)
m := make(map[string]V, g.Len())

g.Range(ctx, func(key string, val V, exp int64) bool {
gob.Register(val)
mu.Lock()
m[key] = val
mu.Unlock()
return true
})
m := g.ToRawMap(ctx)
gob.Register(map[string]V{})
return gob.NewEncoder(w).Encode(&m)
}
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ module github.com/kpango/gache/v2
go 1.20

require (
github.com/cornelk/hashmap v1.0.8
github.com/kpango/fastime v1.1.9
github.com/kpango/glg v1.6.15
github.com/zeebo/xxh3 v1.0.2
golang.org/x/sync v0.1.0
)

require (
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc=
github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
Expand All @@ -15,6 +13,4 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
Loading