diff --git a/Makefile b/Makefile index cfadc31..74ca799 100644 --- a/Makefile +++ b/Makefile @@ -31,6 +31,7 @@ profile: clean init mkdir bench mkdir pprof \ + # go test -count=3 -timeout=30m -run=NONE -bench=BenchmarkChangeOutAllInt_gache -benchmem -o pprof/gache-test.bin -cpuprofile pprof/cpu-gache.out -memprofile pprof/mem-gache.out go test -count=3 -timeout=30m -run=NONE -bench=BenchmarkGacheSetBigDataWithTTL -benchmem -o pprof/gache-test.bin -cpuprofile pprof/cpu-gache.out -memprofile pprof/mem-gache.out go tool pprof --svg pprof/gache-test.bin pprof/cpu-gache.out > cpu-gache.svg go tool pprof --svg pprof/gache-test.bin pprof/mem-gache.out > mem-gache.svg diff --git a/gache.go b/gache.go index 31e9e87..c046137 100644 --- a/gache.go +++ b/gache.go @@ -10,10 +10,8 @@ import ( "time" "unsafe" - "github.com/cornelk/hashmap" "github.com/kpango/fastime" "github.com/zeebo/xxh3" - "golang.org/x/sync/singleflight" ) type ( @@ -64,10 +62,9 @@ type ( expire int64 l uint64 cancel atomic.Pointer[context.CancelFunc] - expGroup singleflight.Group expChan chan string expFunc func(context.Context, string) - shards [slen]*hashmap.Map[string, value[V]] + shards [slen]*Map[string, *value[V]] } value[V any] struct { @@ -101,15 +98,11 @@ func New[V any](opts ...Option[V]) Gache[V] { return g } -func newMap[V any]() (m *hashmap.Map[string, value[V]]) { - m = hashmap.New[string, value[V]]() - m.SetHasher(func(k string) uintptr { - return uintptr(xxh3.HashString(k)) - }) - return m +func newMap[V any]() (m *Map[string, *value[V]]) { + return new(Map[string, *value[V]]) } -func getShardID(key string) uint64 { +func getShardID(key string) (id uint64) { if len(key) > 128 { return xxh3.HashString(key[:128]) & mask } @@ -117,7 +110,7 @@ func getShardID(key string) uint64 { } // isValid checks expiration of value -func (v *value[V]) isValid() bool { +func (v *value[V]) isValid() (valid bool) { return v.expire <= 0 || fastime.UnixNanoNow() <= v.expire } @@ -170,8 +163,14 @@ func (g *gache[V]) StartExpired(ctx context.Context, dur time.Duration) Gache[V] // ToMap returns All Cache Key-Value sync.Map func (g *gache[V]) ToMap(ctx context.Context) *sync.Map { m := new(sync.Map) - g.Range(ctx, func(key string, val V, exp int64) bool { - go m.Store(key, val) + var wg sync.WaitGroup + defer wg.Wait() + g.Range(ctx, func(key string, val V, exp int64) (ok bool) { + wg.Add(1) + go func() { + m.Store(key, val) + wg.Done() + }() return true }) @@ -182,7 +181,7 @@ func (g *gache[V]) ToMap(ctx context.Context) *sync.Map { func (g *gache[V]) ToRawMap(ctx context.Context) map[string]V { m := make(map[string]V, g.Len()) mu := new(sync.Mutex) - g.Range(ctx, func(key string, val V, exp int64) bool { + g.Range(ctx, func(key string, val V, exp int64) (ok bool) { mu.Lock() m[key] = val mu.Unlock() @@ -192,30 +191,29 @@ func (g *gache[V]) ToRawMap(ctx context.Context) map[string]V { } // get returns value & exists from key -func (g *gache[V]) get(key string) (V, int64, bool) { - var val V - v, ok := g.shards[getShardID(key)].Get(key) +func (g *gache[V]) get(key string) (v V, expire int64, ok bool) { + var val *value[V] + val, ok = g.shards[getShardID(key)].Load(key) if !ok { - return val, 0, false + return v, 0, false } - if v.isValid() { - val = v.val - return val, v.expire, true + if val.isValid() { + return val.val, val.expire, true } g.expiration(key) - return val, v.expire, false + return v, val.expire, false } // Get returns value & exists from key -func (g *gache[V]) Get(key string) (V, bool) { - v, _, ok := g.get(key) +func (g *gache[V]) Get(key string) (v V, ok bool) { + v, _, ok = g.get(key) return v, ok } // GetWithExpire returns value & expire & exists from key -func (g *gache[V]) GetWithExpire(key string) (V, int64, bool) { +func (g *gache[V]) GetWithExpire(key string) (v V, expire int64, ok bool) { return g.get(key) } @@ -224,11 +222,13 @@ func (g *gache[V]) set(key string, val V, expire int64) { if expire > 0 { expire = fastime.UnixNanoNow() + expire } - atomic.AddUint64(&g.l, 1) - g.shards[getShardID(key)].Set(key, value[V]{ + _, loaded := g.shards[getShardID(key)].Swap(key, &value[V]{ expire: expire, val: val, }) + if !loaded { + atomic.AddUint64(&g.l, 1) + } } // SetWithExpire sets key-value & expiration to Gache @@ -243,41 +243,39 @@ func (g *gache[V]) Set(key string, val V) { // Delete deletes value from Gache using key func (g *gache[V]) Delete(key string) (loaded bool) { - atomic.AddUint64(&g.l, ^uint64(0)) - return g.shards[getShardID(key)].Del(key) + _, loaded = g.shards[getShardID(key)].LoadAndDelete(key) + if loaded { + atomic.AddUint64(&g.l, ^uint64(0)) + } + return } func (g *gache[V]) expiration(key string) { - g.expGroup.Do(key, func() (interface{}, error) { - g.Delete(key) - if g.expFuncEnabled { - g.expChan <- key - } - return nil, nil - }) + g.Delete(key) + if g.expFuncEnabled { + g.expChan <- key + } } // DeleteExpired deletes expired value from Gache it can be cancel using context -func (g *gache[V]) DeleteExpired(ctx context.Context) uint64 { - wg := new(sync.WaitGroup) - var rows uint64 +func (g *gache[V]) DeleteExpired(ctx context.Context) (rows uint64) { + var wg sync.WaitGroup for i := range g.shards { wg.Add(1) go func(c context.Context, idx int) { - g.shards[idx].Range(func(k string, v value[V]) bool { - select { - case <-c.Done(): - return false - default: + defer wg.Done() + select { + case <-c.Done(): + return + default: + g.shards[idx].Range(func(k string, v *value[V]) (ok bool) { if !v.isValid() { g.expiration(k) atomic.AddUint64(&rows, 1) - runtime.Gosched() } return true - } - }) - wg.Done() + }) + } }(ctx, i) } wg.Wait() @@ -290,20 +288,19 @@ func (g *gache[V]) Range(ctx context.Context, f func(string, V, int64) bool) Gac for i := range g.shards { wg.Add(1) go func(c context.Context, idx int) { - g.shards[idx].Range(func(k string, v value[V]) bool { - select { - case <-c.Done(): - return false - default: + defer wg.Done() + select { + case <-c.Done(): + return + default: + g.shards[idx].Range(func(k string, v *value[V]) (ok bool) { if v.isValid() { return f(k, v.val, v.expire) } - runtime.Gosched() g.expiration(k) return true - } - }) - wg.Done() + }) + } }(ctx, i) } wg.Wait() @@ -318,16 +315,7 @@ func (g *gache[V]) Len() int { // Write writes all cached data to writer func (g *gache[V]) Write(ctx context.Context, w io.Writer) error { - mu := new(sync.Mutex) - m := make(map[string]V, g.Len()) - - g.Range(ctx, func(key string, val V, exp int64) bool { - gob.Register(val) - mu.Lock() - m[key] = val - mu.Unlock() - return true - }) + m := g.ToRawMap(ctx) gob.Register(map[string]V{}) return gob.NewEncoder(w).Encode(&m) } diff --git a/go.mod b/go.mod index 96b78ea..90bfa23 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,9 @@ module github.com/kpango/gache/v2 go 1.20 require ( - github.com/cornelk/hashmap v1.0.8 github.com/kpango/fastime v1.1.9 github.com/kpango/glg v1.6.15 github.com/zeebo/xxh3 v1.0.2 - golang.org/x/sync v0.1.0 ) require ( diff --git a/go.sum b/go.sum index 5015428..2298c65 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= -github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= @@ -15,6 +13,4 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= diff --git a/map.go b/map.go new file mode 100644 index 0000000..6554b3c --- /dev/null +++ b/map.go @@ -0,0 +1,348 @@ +package gache + +import ( + "sync" + "sync/atomic" +) + +type Map[K, V comparable] struct { + mu sync.Mutex + read atomic.Pointer[readOnly[K, V]] + dirty map[K]*entry[V] + misses int +} + +type readOnly[K, V comparable] struct { + m map[K]*entry[V] + amended bool +} + +type entry[V comparable] struct { + expunged atomic.Pointer[V] + p atomic.Pointer[V] +} + +func newEntry[V comparable](v V) (e *entry[V]) { + e = &entry[V]{} + e.expunged.Store(new(V)) + e.p.Store(&v) + return e +} + +func (m *Map[K, V]) loadReadOnly() (ro readOnly[K, V]) { + if p := m.read.Load(); p != nil { + return *p + } + return readOnly[K, V]{} +} + +func (m *Map[K, V]) Load(key K) (value V, ok bool) { + read := m.loadReadOnly() + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + + read = m.loadReadOnly() + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + + m.missLocked() + } + m.mu.Unlock() + } + if !ok { + return value, false + } + return e.load() +} + +func (e *entry[V]) load() (value V, ok bool) { + p := e.p.Load() + if p == nil || p == e.expunged.Load() { + return value, false + } + return *(*V)(p), true +} + +func (m *Map[K, V]) Store(key K, value V) { + _, _ = m.Swap(key, value) +} + +func (e *entry[V]) tryCompareAndSwap(old, new V) (ok bool) { + p := e.p.Load() + if p == nil || p == e.expunged.Load() || *p != old { + return false + } + + nc := new + for { + if e.p.CompareAndSwap(p, &nc) { + return true + } + p = e.p.Load() + if p == nil || p == e.expunged.Load() || *p != old { + return false + } + } +} + +func (e *entry[V]) unexpungeLocked() (wasExpunged bool) { + return e.p.CompareAndSwap(e.expunged.Load(), nil) +} + +func (e *entry[V]) swapLocked(i *V) (v *V) { + return e.p.Swap(i) +} + +func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + read := m.loadReadOnly() + if e, ok := read.m[key]; ok { + actual, loaded, ok := e.tryLoadOrStore(value) + if ok { + return actual, loaded + } + } + + m.mu.Lock() + read = m.loadReadOnly() + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + m.dirty[key] = e + } + actual, loaded, _ = e.tryLoadOrStore(value) + } else if e, ok := m.dirty[key]; ok { + actual, loaded, _ = e.tryLoadOrStore(value) + m.missLocked() + } else { + if !read.amended { + + m.dirtyLocked() + m.read.Store(&readOnly[K, V]{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + actual, loaded = value, false + } + m.mu.Unlock() + + return actual, loaded +} + +func (e *entry[V]) tryLoadOrStore(i V) (actual V, loaded, ok bool) { + p := e.p.Load() + if p == e.expunged.Load() { + return actual, false, false + } + if p != nil { + return *p, true, true + } + + ic := i + for { + if e.p.CompareAndSwap(nil, &ic) { + return i, false, true + } + p = e.p.Load() + if p == e.expunged.Load() { + return actual, false, false + } + if p != nil { + return *p, true, true + } + } +} + +func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + read := m.loadReadOnly() + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + read = m.loadReadOnly() + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + delete(m.dirty, key) + m.missLocked() + } + m.mu.Unlock() + } + if ok { + return e.delete() + } + return value, false +} + +func (m *Map[K, V]) Delete(key K) { + m.LoadAndDelete(key) +} + +func (e *entry[V]) delete() (value V, ok bool) { + for { + p := e.p.Load() + if p == nil || p == e.expunged.Load() { + return value, false + } + if e.p.CompareAndSwap(p, nil) { + return *p, true + } + } +} + +func (e *entry[V]) trySwap(i *V) (v *V, ok bool) { + for { + p := e.p.Load() + if p == e.expunged.Load() { + return nil, false + } + if e.p.CompareAndSwap(p, i) { + return p, true + } + } +} + +func (m *Map[K, V]) Swap(key K, value V) (previous V, loaded bool) { + read := m.loadReadOnly() + if e, ok := read.m[key]; ok { + if v, ok := e.trySwap(&value); ok { + if v == nil { + return previous, false + } + return *v, true + } + } + + m.mu.Lock() + read = m.loadReadOnly() + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + m.dirty[key] = e + } + if v := e.swapLocked(&value); v != nil { + loaded = true + previous = *v + } + } else if e, ok := m.dirty[key]; ok { + if v := e.swapLocked(&value); v != nil { + loaded = true + previous = *v + } + } else { + if !read.amended { + m.dirtyLocked() + m.read.Store(&readOnly[K, V]{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + } + m.mu.Unlock() + return previous, loaded +} + +func (m *Map[K, V]) CompareAndSwap(key K, old, new V) (swapped bool) { + read := m.loadReadOnly() + if e, ok := read.m[key]; ok { + return e.tryCompareAndSwap(old, new) + } else if !read.amended { + return false + } + + m.mu.Lock() + defer m.mu.Unlock() + read = m.loadReadOnly() + swapped = false + if e, ok := read.m[key]; ok { + swapped = e.tryCompareAndSwap(old, new) + } else if e, ok := m.dirty[key]; ok { + swapped = e.tryCompareAndSwap(old, new) + + m.missLocked() + } + return swapped +} + +func (m *Map[K, V]) CompareAndDelete(key K, old V) (deleted bool) { + read := m.loadReadOnly() + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + read = m.loadReadOnly() + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + + m.missLocked() + } + m.mu.Unlock() + } + for ok { + p := e.p.Load() + if p == nil || p == e.expunged.Load() || *p != old { + return false + } + if e.p.CompareAndSwap(p, nil) { + return true + } + } + return false +} + +func (m *Map[K, V]) Range(f func(key K, value V) bool) { + + read := m.loadReadOnly() + if read.amended { + + m.mu.Lock() + read = m.loadReadOnly() + if read.amended { + read = readOnly[K, V]{m: m.dirty} + m.read.Store(&read) + m.dirty = nil + m.misses = 0 + } + m.mu.Unlock() + } + + for k, e := range read.m { + v, ok := e.load() + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +func (m *Map[K, V]) missLocked() { + m.misses++ + if m.misses < len(m.dirty) { + return + } + m.read.Store(&readOnly[K, V]{m: m.dirty}) + m.dirty = nil + m.misses = 0 +} + +func (m *Map[K, V]) dirtyLocked() { + if m.dirty != nil { + return + } + + read := m.loadReadOnly() + m.dirty = make(map[K]*entry[V], len(read.m)) + for k, e := range read.m { + if !e.tryExpungeLocked() { + m.dirty[k] = e + } + } +} + +func (e *entry[V]) tryExpungeLocked() (isExpunged bool) { + p := e.p.Load() + for p == nil { + if e.p.CompareAndSwap(nil, e.expunged.Load()) { + return true + } + p = e.p.Load() + } + return p == e.expunged.Load() +}