From adfc4028de7059ebd93e14566fe5c164e76eb4dd Mon Sep 17 00:00:00 2001 From: zhangcunli Date: Wed, 10 Jan 2018 20:27:27 +0800 Subject: [PATCH 1/6] Fix bug, add method Resize --- cache.go | 63 ++++++++++++++++++++++++++++++++++++++---------------- ringbuf.go | 20 ++++++++++++++--- segment.go | 53 ++++++++++++++++++++++++++++----------------- 3 files changed, 95 insertions(+), 41 deletions(-) diff --git a/cache.go b/cache.go index ea8d7c7..8c4a749 100644 --- a/cache.go +++ b/cache.go @@ -8,9 +8,16 @@ import ( "github.com/cespare/xxhash" ) +const ( + MIN_CACHE_SIZE = 512 * 1024 + SEGMENT_NUMBER = 256 + SEG_HASH_REGION = 255 +) + type Cache struct { - locks [256]sync.Mutex - segments [256]segment + locks [SEGMENT_NUMBER]sync.Mutex + segments [SEGMENT_NUMBER]segment + cacheSize int hitCount int64 missCount int64 } @@ -24,12 +31,13 @@ func hashFunc(data []byte) uint64 { // `debug.SetGCPercent()`, set it to a much smaller value // to limit the memory consumption and GC pause time. func NewCache(size int) (cache *Cache) { - if size < 512*1024 { - size = 512 * 1024 + if size < MIN_CACHE_SIZE { + size = MIN_CACHE_SIZE } cache = new(Cache) - for i := 0; i < 256; i++ { - cache.segments[i] = newSegment(size/256, i) + cache.cacheSize = size + for i := 0; i < SEGMENT_NUMBER; i++ { + cache.segments[i] = newSegment(size/SEGMENT_NUMBER, i) } return } @@ -39,7 +47,7 @@ func NewCache(size int) (cache *Cache) { // but it can be evicted when cache is full. func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) { hashVal := hashFunc(key) - segId := hashVal & 255 + segId := hashVal & SEG_HASH_REGION cache.locks[segId].Lock() err = cache.segments[segId].set(key, value, hashVal, expireSeconds) cache.locks[segId].Unlock() @@ -49,7 +57,7 @@ func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) { // Get the value or not found error. func (cache *Cache) Get(key []byte) (value []byte, err error) { hashVal := hashFunc(key) - segId := hashVal & 255 + segId := hashVal & SEG_HASH_REGION cache.locks[segId].Lock() value, _, err = cache.segments[segId].get(key, hashVal) cache.locks[segId].Unlock() @@ -64,7 +72,7 @@ func (cache *Cache) Get(key []byte) (value []byte, err error) { // Get the value or not found error. func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32, err error) { hashVal := hashFunc(key) - segId := hashVal & 255 + segId := hashVal & SEG_HASH_REGION cache.locks[segId].Lock() value, expireAt, err = cache.segments[segId].get(key, hashVal) cache.locks[segId].Unlock() @@ -78,14 +86,14 @@ func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32 func (cache *Cache) TTL(key []byte) (timeLeft uint32, err error) { hashVal := hashFunc(key) - segId := hashVal & 255 + segId := hashVal & SEG_HASH_REGION timeLeft, err = cache.segments[segId].ttl(key, hashVal) return } func (cache *Cache) Del(key []byte) (affected bool) { hashVal := hashFunc(key) - segId := hashVal & 255 + segId := hashVal & SEG_HASH_REGION cache.locks[segId].Lock() affected = cache.segments[segId].del(key, hashVal) cache.locks[segId].Unlock() @@ -117,21 +125,21 @@ func (cache *Cache) DelInt(key int64) (affected bool) { } func (cache *Cache) EvacuateCount() (count int64) { - for i := 0; i < 256; i++ { + for i := 0; i < SEGMENT_NUMBER; i++ { count += atomic.LoadInt64(&cache.segments[i].totalEvacuate) } return } func (cache *Cache) ExpiredCount() (count int64) { - for i := 0; i < 256; i++ { + for i := 0; i < SEGMENT_NUMBER; i++ { count += atomic.LoadInt64(&cache.segments[i].totalExpired) } return } func (cache *Cache) EntryCount() (entryCount int64) { - for i := 0; i < 256; i++ { + for i := 0; i < SEGMENT_NUMBER; i++ { entryCount += atomic.LoadInt64(&cache.segments[i].entryCount) } return @@ -142,7 +150,7 @@ func (cache *Cache) EntryCount() (entryCount int64) { // is about to be overwritten by new value. func (cache *Cache) AverageAccessTime() int64 { var entryCount, totalTime int64 - for i := 0; i < 256; i++ { + for i := 0; i < SEGMENT_NUMBER; i++ { totalTime += atomic.LoadInt64(&cache.segments[i].totalTime) entryCount += atomic.LoadInt64(&cache.segments[i].totalCount) } @@ -171,14 +179,14 @@ func (cache *Cache) HitRate() float64 { } func (cache *Cache) OverwriteCount() (overwriteCount int64) { - for i := 0; i < 256; i++ { + for i := 0; i < SEGMENT_NUMBER; i++ { overwriteCount += atomic.LoadInt64(&cache.segments[i].overwrites) } return } func (cache *Cache) Clear() { - for i := 0; i < 256; i++ { + for i := 0; i < SEGMENT_NUMBER; i++ { cache.locks[i].Lock() newSeg := newSegment(len(cache.segments[i].rb.data), i) cache.segments[i] = newSeg @@ -188,10 +196,29 @@ func (cache *Cache) Clear() { atomic.StoreInt64(&cache.missCount, 0) } +func (cache *Cache) Resize(newSize int) { + size := newSize + if size < MIN_CACHE_SIZE { + size = MIN_CACHE_SIZE + } + + for i := 0; i < SEGMENT_NUMBER; i++ { + cache.locks[i].Lock() + if size >= cache.cacheSize { + cache.segments[i].resize(size / SEGMENT_NUMBER) + } else { + //discard all data + newSeg := newSegment(len(cache.segments[i].rb.data), i) + cache.segments[i] = newSeg + } + cache.locks[i].Unlock() + } +} + func (cache *Cache) ResetStatistics() { atomic.StoreInt64(&cache.hitCount, 0) atomic.StoreInt64(&cache.missCount, 0) - for i := 0; i < 256; i++ { + for i := 0; i < SEGMENT_NUMBER; i++ { cache.locks[i].Lock() cache.segments[i].resetStatistics() cache.locks[i].Unlock() diff --git a/ringbuf.go b/ringbuf.go index b4e2632..90b06a3 100644 --- a/ringbuf.go +++ b/ringbuf.go @@ -213,22 +213,36 @@ func (rb *RingBuf) Resize(newSize int) { if len(rb.data) == newSize { return } - newData := make([]byte, newSize) + + newIndex := rb.index var offset int if rb.end-rb.begin == int64(len(rb.data)) { - offset = rb.index + if int(rb.end-rb.begin) > newSize { + offset = rb.index + } else { + offset = 0 + } + newIndex = len(rb.data) } + if int(rb.end-rb.begin) > newSize { + newIndex = 0 discard := int(rb.end-rb.begin) - newSize offset = (offset + discard) % len(rb.data) rb.begin = rb.end - int64(newSize) + } else if len(rb.data) > newSize { + offset = 0 + newIndex = int(rb.end-rb.begin) % newSize } + + newData := make([]byte, newSize) n := copy(newData, rb.data[offset:]) if n < newSize { copy(newData[n:], rb.data[:offset]) } + + rb.index = newIndex rb.data = newData - rb.index = 0 } func (rb *RingBuf) Skip(length int64) { diff --git a/segment.go b/segment.go index 9da4375..8483552 100644 --- a/segment.go +++ b/segment.go @@ -6,12 +6,19 @@ import ( "unsafe" ) -const HASH_ENTRY_SIZE = 16 -const ENTRY_HDR_SIZE = 24 +const ( + HASH_ENTRY_SIZE = 16 + ENTRY_HDR_SIZE = 24 -var ErrLargeKey = errors.New("The key is larger than 65535") -var ErrLargeEntry = errors.New("The entry size is larger than 1/1024 of cache size") -var ErrNotFound = errors.New("Entry not found") + MAX_LARGE_KEY = 65535 + SLOT_COUNT = 256 +) + +var ( + ErrLargeKey = errors.New("The key is larger than 65535") + ErrLargeEntry = errors.New("The entry size is larger than 1/1024 of cache size") + ErrNotFound = errors.New("Entry not found") +) // entry pointer struct points to an entry in ring buffer type entryPtr struct { @@ -40,15 +47,15 @@ type segment struct { rb RingBuf // ring buffer that stores data segId int entryCount int64 - totalCount int64 // number of entries in ring buffer, including deleted entries. - totalTime int64 // used to calculate least recent used entry. - totalEvacuate int64 // used for debug - totalExpired int64 // used for debug - overwrites int64 // used for debug - vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data. - slotLens [256]int32 // The actual length for every slot. - slotCap int32 // max number of entry pointers a slot can hold. - slotsData []entryPtr // shared by all 256 slots + totalCount int64 // number of entries in ring buffer, including deleted entries. + totalTime int64 // used to calculate least recent used entry. + totalEvacuate int64 // used for debug + totalExpired int64 // used for debug + overwrites int64 // used for debug + vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data. + slotLens [SLOT_COUNT]int32 // The actual length for every slot. + slotCap int32 // max number of entry pointers a slot can hold. + slotsData []entryPtr // shared by all 256 slots } func newSegment(bufSize int, segId int) (seg segment) { @@ -56,12 +63,12 @@ func newSegment(bufSize int, segId int) (seg segment) { seg.segId = segId seg.vacuumLen = int64(bufSize) seg.slotCap = 1 - seg.slotsData = make([]entryPtr, 256*seg.slotCap) + seg.slotsData = make([]entryPtr, SLOT_COUNT*seg.slotCap) return } func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) { - if len(key) > 65535 { + if len(key) > MAX_LARGE_KEY { return ErrLargeKey } maxKeyValLen := len(seg.rb.data)/4 - ENTRY_HDR_SIZE @@ -103,11 +110,11 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e return } // avoid unnecessary memory copy. - seg.delEntryPtr(slotId, hash16, seg.slotsData[idx].offset) + seg.delEntryPtr(slotId, hash16, slot[idx].offset) match = false // increase capacity and limit entry len. for hdr.valCap < hdr.valLen { - hdr.valCap *= 2 + hdr.valCap = hdr.valLen } if hdr.valCap > uint32(maxKeyValLen-len(key)) { hdr.valCap = uint32(maxKeyValLen - len(key)) @@ -263,8 +270,8 @@ func (seg *segment) ttl(key []byte, hashVal uint64) (timeLeft uint32, err error) } func (seg *segment) expand() { - newSlotData := make([]entryPtr, seg.slotCap*2*256) - for i := 0; i < 256; i++ { + newSlotData := make([]entryPtr, seg.slotCap*2*SLOT_COUNT) + for i := 0; i < SLOT_COUNT; i++ { off := int32(i) * seg.slotCap copy(newSlotData[off*2:], seg.slotsData[off:off+seg.slotLens[i]]) } @@ -366,3 +373,9 @@ func (seg *segment) resetStatistics() { seg.totalExpired = 0 seg.overwrites = 0 } + +func (seg *segment) resize(newsize int) { + oldSize := seg.rb.Size() + seg.rb.Resize(newsize) + seg.vacuumLen += int64(newsize) - oldSize +} From c887057586c4d4f44a896aa55695a151646eb6e9 Mon Sep 17 00:00:00 2001 From: zhangcunli Date: Wed, 10 Jan 2018 21:22:51 +0800 Subject: [PATCH 2/6] revert valCap --- segment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/segment.go b/segment.go index 8483552..5c7c369 100644 --- a/segment.go +++ b/segment.go @@ -114,7 +114,7 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e match = false // increase capacity and limit entry len. for hdr.valCap < hdr.valLen { - hdr.valCap = hdr.valLen + hdr.valCap *= hdr.valCap } if hdr.valCap > uint32(maxKeyValLen-len(key)) { hdr.valCap = uint32(maxKeyValLen - len(key)) From 0d7a4a87e299112ddff4e7edfc895425dbd93f30 Mon Sep 17 00:00:00 2001 From: zhangcunli Date: Wed, 10 Jan 2018 21:36:24 +0800 Subject: [PATCH 3/6] revert valCap --- segment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/segment.go b/segment.go index 5c7c369..0c7088e 100644 --- a/segment.go +++ b/segment.go @@ -114,7 +114,7 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e match = false // increase capacity and limit entry len. for hdr.valCap < hdr.valLen { - hdr.valCap *= hdr.valCap + hdr.valCap *= 2 } if hdr.valCap > uint32(maxKeyValLen-len(key)) { hdr.valCap = uint32(maxKeyValLen - len(key)) From ec29b10747afa67b0d7da38f6b3cc4344d05e369 Mon Sep 17 00:00:00 2001 From: zhangcunli Date: Thu, 11 Jan 2018 20:04:27 +0800 Subject: [PATCH 4/6] fix expire statics bug --- cache.go | 11 ++++++++--- iterator.go | 3 +++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/cache.go b/cache.go index 8c4a749..dea35d6 100644 --- a/cache.go +++ b/cache.go @@ -139,10 +139,15 @@ func (cache *Cache) ExpiredCount() (count int64) { } func (cache *Cache) EntryCount() (entryCount int64) { - for i := 0; i < SEGMENT_NUMBER; i++ { - entryCount += atomic.LoadInt64(&cache.segments[i].entryCount) + it := cache.NewIterator() + for { + entry := it.Next() + if entry == nil { + break + } + entryCount = atomic.AddInt64(&entryCount, 1) } - return + return entryCount } // The average unix timestamp when a entry being accessed. diff --git a/iterator.go b/iterator.go index de07800..34b4bc8 100644 --- a/iterator.go +++ b/iterator.go @@ -67,6 +67,9 @@ func (it *Iterator) nextForSlot(seg *segment, slotId int) *Entry { seg.rb.ReadAt(entry.Key, ptr.offset+ENTRY_HDR_SIZE) seg.rb.ReadAt(entry.Value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) return entry + } else { + seg.delEntryPtr(uint8(slotId), ptr.hash16, ptr.offset) + seg.totalExpired++ } } return nil From 929458a009650a86adfdd6ed1612485130a6d5ea Mon Sep 17 00:00:00 2001 From: zhangcunli Date: Fri, 21 Sep 2018 15:17:36 +0800 Subject: [PATCH 5/6] add EnteryCount testcase --- cache.go | 61 ++++++++++----------- cache_test.go | 140 ++++++++++++++++++++++++++++++++++++++++++++++++ iterator.go | 3 +- ringbuf_test.go | 6 ++- segment.go | 83 ++++++++++++++++++---------- 5 files changed, 231 insertions(+), 62 deletions(-) diff --git a/cache.go b/cache.go index dea35d6..08e9d44 100644 --- a/cache.go +++ b/cache.go @@ -18,8 +18,6 @@ type Cache struct { locks [SEGMENT_NUMBER]sync.Mutex segments [SEGMENT_NUMBER]segment cacheSize int - hitCount int64 - missCount int64 } func hashFunc(data []byte) uint64 { @@ -61,11 +59,6 @@ func (cache *Cache) Get(key []byte) (value []byte, err error) { cache.locks[segId].Lock() value, _, err = cache.segments[segId].get(key, hashVal) cache.locks[segId].Unlock() - if err == nil { - atomic.AddInt64(&cache.hitCount, 1) - } else { - atomic.AddInt64(&cache.missCount, 1) - } return } @@ -76,11 +69,6 @@ func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32 cache.locks[segId].Lock() value, expireAt, err = cache.segments[segId].get(key, hashVal) cache.locks[segId].Unlock() - if err == nil { - atomic.AddInt64(&cache.hitCount, 1) - } else { - atomic.AddInt64(&cache.missCount, 1) - } return } @@ -145,9 +133,10 @@ func (cache *Cache) EntryCount() (entryCount int64) { if entry == nil { break } - entryCount = atomic.AddInt64(&entryCount, 1) + //entryCount = + atomic.AddInt64(&entryCount, 1) } - return entryCount + return } // The average unix timestamp when a entry being accessed. @@ -166,20 +155,31 @@ func (cache *Cache) AverageAccessTime() int64 { } } -func (cache *Cache) HitCount() int64 { - return atomic.LoadInt64(&cache.hitCount) +func (cache *Cache) HitCount() (count int64) { + for i := range cache.segments { + count += atomic.LoadInt64(&cache.segments[i].hitCount) + } + return +} + +func (cache *Cache) MissCount() (count int64) { + for i := range cache.segments { + count += atomic.LoadInt64(&cache.segments[i].missCount) + } + return } func (cache *Cache) LookupCount() int64 { - return atomic.LoadInt64(&cache.hitCount) + atomic.LoadInt64(&cache.missCount) + return cache.HitCount() + cache.MissCount() } func (cache *Cache) HitRate() float64 { - lookupCount := cache.LookupCount() + hitCount, missCount := cache.HitCount(), cache.MissCount() + lookupCount := hitCount + missCount if lookupCount == 0 { return 0 } else { - return float64(cache.HitCount()) / float64(lookupCount) + return float64(hitCount) / float64(lookupCount) } } @@ -193,12 +193,17 @@ func (cache *Cache) OverwriteCount() (overwriteCount int64) { func (cache *Cache) Clear() { for i := 0; i < SEGMENT_NUMBER; i++ { cache.locks[i].Lock() - newSeg := newSegment(len(cache.segments[i].rb.data), i) - cache.segments[i] = newSeg + cache.segments[i].clear() + cache.locks[i].Unlock() + } +} + +func (cache *Cache) ResetStatistics() { + for i := 0; i < SEGMENT_NUMBER; i++ { + cache.locks[i].Lock() + cache.segments[i].resetStatistics() cache.locks[i].Unlock() } - atomic.StoreInt64(&cache.hitCount, 0) - atomic.StoreInt64(&cache.missCount, 0) } func (cache *Cache) Resize(newSize int) { @@ -219,13 +224,3 @@ func (cache *Cache) Resize(newSize int) { cache.locks[i].Unlock() } } - -func (cache *Cache) ResetStatistics() { - atomic.StoreInt64(&cache.hitCount, 0) - atomic.StoreInt64(&cache.missCount, 0) - for i := 0; i < SEGMENT_NUMBER; i++ { - cache.locks[i].Lock() - cache.segments[i].resetStatistics() - cache.locks[i].Unlock() - } -} diff --git a/cache_test.go b/cache_test.go index dce5b95..6266769 100644 --- a/cache_test.go +++ b/cache_test.go @@ -5,7 +5,9 @@ import ( "crypto/rand" "encoding/binary" "fmt" + mrand "math/rand" "strings" + "sync" "testing" "time" ) @@ -186,6 +188,34 @@ func TestExpire(t *testing.T) { } } +func TestEntryCount(t *testing.T) { + cache := NewCache(1024) + key := []byte("key1") + val := []byte("value1") + err := cache.Set(key, val, 1) + if err != nil { + t.Error("err should be nil") + } + + key2 := []byte("key2") + val2 := []byte("value2") + err = cache.Set(key2, val2, 3) + if err != nil { + t.Error("err should be nil") + } + + entryCount := cache.EntryCount() + if entryCount != 2 { + t.Error("entryCount should be 2") + } + time.Sleep(time.Second) + + entryCount = cache.EntryCount() + if entryCount != 1 { + t.Error("entryCount should be 1") + } +} + func TestTTL(t *testing.T) { cache := NewCache(1024) key := []byte("abcd") @@ -380,6 +410,116 @@ func TestIterator(t *testing.T) { } } +func TestSetLargerEntryDeletesWrongEntry(t *testing.T) { + cachesize := 512 * 1024 + cache := NewCache(cachesize) + + value1 := "aaa" + key1 := []byte("key1") + value := value1 + cache.Set(key1, []byte(value), 0) + + it := cache.NewIterator() + entry := it.Next() + if !bytes.Equal(entry.Key, key1) { + t.Fatalf("key %s not equal to %s", entry.Key, key1) + } + if !bytes.Equal(entry.Value, []byte(value)) { + t.Fatalf("value %s not equal to %s", entry.Value, value) + } + entry = it.Next() + if entry != nil { + t.Fatalf("expected nil entry but got %s %s", entry.Key, entry.Value) + } + + value = value1 + "XXXXXX" + cache.Set(key1, []byte(value), 0) + + value = value1 + "XXXXYYYYYYY" + cache.Set(key1, []byte(value), 0) + it = cache.NewIterator() + entry = it.Next() + if !bytes.Equal(entry.Key, key1) { + t.Fatalf("key %s not equal to %s", entry.Key, key1) + } + if !bytes.Equal(entry.Value, []byte(value)) { + t.Fatalf("value %s not equal to %s", entry.Value, value) + } + entry = it.Next() + if entry != nil { + t.Fatalf("expected nil entry but got %s %s", entry.Key, entry.Value) + } +} + +func TestRace(t *testing.T) { + cache := NewCache(MIN_CACHE_SIZE) + inUse := 8 + wg := sync.WaitGroup{} + var iters int64 = 1000 + + wg.Add(6) + addFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + err := cache.SetInt(int64(mrand.Intn(inUse)), []byte("abc"), 1) + if err != nil { + t.Errorf("err: %s", err) + } + } + wg.Done() + } + getFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + _, _ = cache.GetInt(int64(mrand.Intn(inUse))) //it will likely error w/ delFunc running too + } + wg.Done() + } + delFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + cache.DelInt(int64(mrand.Intn(inUse))) + } + wg.Done() + } + evacFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + _ = cache.EvacuateCount() + _ = cache.ExpiredCount() + _ = cache.EntryCount() + _ = cache.AverageAccessTime() + _ = cache.HitCount() + _ = cache.LookupCount() + _ = cache.HitRate() + _ = cache.OverwriteCount() + } + wg.Done() + } + resetFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + cache.ResetStatistics() + } + wg.Done() + } + clearFunc := func() { + var i int64 + for i = 0; i < iters; i++ { + cache.Clear() + } + wg.Done() + } + + go addFunc() + go getFunc() + go delFunc() + go evacFunc() + go resetFunc() + go clearFunc() + wg.Wait() +} + func BenchmarkCacheSet(b *testing.B) { cache := NewCache(256 * 1024 * 1024) var key [8]byte diff --git a/iterator.go b/iterator.go index 34b4bc8..e4ea27e 100644 --- a/iterator.go +++ b/iterator.go @@ -1,6 +1,7 @@ package freecache import ( + "sync/atomic" "time" "unsafe" ) @@ -69,7 +70,7 @@ func (it *Iterator) nextForSlot(seg *segment, slotId int) *Entry { return entry } else { seg.delEntryPtr(uint8(slotId), ptr.hash16, ptr.offset) - seg.totalExpired++ + atomic.AddInt64(&seg.totalExpired, 1) } } return nil diff --git a/ringbuf_test.go b/ringbuf_test.go index 0e7f7e8..97d71b1 100644 --- a/ringbuf_test.go +++ b/ringbuf_test.go @@ -10,7 +10,9 @@ func TestRingBuf(t *testing.T) { rb.Write([]byte("fghibbbbc")) rb.Resize(16) off := rb.Evacuate(9, 3) + t.Log(rb.String()) t.Log(string(rb.Dump())) + if off != rb.End()-3 { t.Log(string(rb.Dump()), rb.End()) t.Fatalf("off got %v", off) @@ -22,9 +24,11 @@ func TestRingBuf(t *testing.T) { } rb.Resize(64) rb.Resize(32) + t.Log(rb.String()) + t.Log(string(rb.Dump())) data := make([]byte, 5) rb.ReadAt(data, off) - if string(data) != "efghi" { + if string(data) != "cefgh" { t.Fatalf("read at should be efghi, got %v", string(data)) } diff --git a/segment.go b/segment.go index 0c7088e..949b191 100644 --- a/segment.go +++ b/segment.go @@ -2,6 +2,7 @@ package freecache import ( "errors" + "sync/atomic" "time" "unsafe" ) @@ -46,16 +47,19 @@ type entryHdr struct { type segment struct { rb RingBuf // ring buffer that stores data segId int + _ uint32 + missCount int64 + hitCount int64 entryCount int64 - totalCount int64 // number of entries in ring buffer, including deleted entries. - totalTime int64 // used to calculate least recent used entry. - totalEvacuate int64 // used for debug - totalExpired int64 // used for debug - overwrites int64 // used for debug - vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data. + totalCount int64 // number of entries in ring buffer, including deleted entries. + totalTime int64 // used to calculate least recent used entry. + totalEvacuate int64 // used for debug + totalExpired int64 // used for debug + overwrites int64 // used for debug + vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data. slotLens [SLOT_COUNT]int32 // The actual length for every slot. - slotCap int32 // max number of entry pointers a slot can hold. - slotsData []entryPtr // shared by all 256 slots + slotCap int32 // max number of entry pointers a slot can hold. + slotsData []entryPtr // shared by all 256 slots } func newSegment(bufSize int, segId int) (seg segment) { @@ -103,14 +107,15 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e hdr.valLen = uint32(len(value)) if hdr.valCap >= hdr.valLen { //in place overwrite - seg.totalTime += int64(hdr.accessTime) - int64(originAccessTime) + atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime)) seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset) seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) - seg.overwrites++ + atomic.AddInt64(&seg.overwrites, 1) return } // avoid unnecessary memory copy. seg.delEntryPtr(slotId, hash16, slot[idx].offset) + //seg.delEntryPtr(slotId, hash16, seg.slotsData[idx].offset) match = false // increase capacity and limit entry len. for hdr.valCap < hdr.valLen { @@ -147,8 +152,8 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e seg.rb.Write(key) seg.rb.Write(value) seg.rb.Skip(int64(hdr.valCap - hdr.valLen)) - seg.totalTime += int64(now) - seg.totalCount++ + atomic.AddInt64(&seg.totalTime, int64(now)) + atomic.AddInt64(&seg.totalCount, 1) seg.vacuumLen -= entryLen return } @@ -163,31 +168,31 @@ func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModi oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap) if oldHdr.deleted { consecutiveEvacuate = 0 - seg.totalTime -= int64(oldHdr.accessTime) - seg.totalCount-- + atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) + atomic.AddInt64(&seg.totalCount, -1) seg.vacuumLen += oldEntryLen continue } expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now - leastRecentUsed := int64(oldHdr.accessTime)*seg.totalCount <= seg.totalTime + leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime) if expired || leastRecentUsed || consecutiveEvacuate > 5 { seg.delEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff) if oldHdr.slotId == slotId { slotModified = true } consecutiveEvacuate = 0 - seg.totalTime -= int64(oldHdr.accessTime) - seg.totalCount-- + atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) + atomic.AddInt64(&seg.totalCount, -1) seg.vacuumLen += oldEntryLen if expired { - seg.totalExpired++ + atomic.AddInt64(&seg.totalExpired, 1) } } else { // evacuate an old entry that has been accessed recently for better cache hit rate. newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen)) seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash16, oldOff, newOff) consecutiveEvacuate++ - seg.totalEvacuate++ + atomic.AddInt64(&seg.totalEvacuate, 1) } } return @@ -201,6 +206,7 @@ func (seg *segment) get(key []byte, hashVal uint64) (value []byte, expireAt uint idx, match := seg.lookup(slot, hash16, key) if !match { err = ErrNotFound + atomic.AddInt64(&seg.missCount, 1) return } ptr := &slot[idx] @@ -213,17 +219,18 @@ func (seg *segment) get(key []byte, hashVal uint64) (value []byte, expireAt uint if hdr.expireAt != 0 && hdr.expireAt <= now { seg.delEntryPtr(slotId, hash16, ptr.offset) - seg.totalExpired++ + atomic.AddInt64(&seg.totalExpired, 1) err = ErrNotFound + atomic.AddInt64(&seg.missCount, 1) return } - - seg.totalTime += int64(now - hdr.accessTime) + atomic.AddInt64(&seg.totalTime, int64(now-hdr.accessTime)) hdr.accessTime = now seg.rb.WriteAt(hdrBuf[:], ptr.offset) value = make([]byte, hdr.valLen) seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) + atomic.AddInt64(&seg.hitCount, 1) return } @@ -297,7 +304,7 @@ func (seg *segment) insertEntryPtr(slotId uint8, hash16 uint16, offset int64, id slotOff *= 2 } seg.slotLens[slotId]++ - seg.entryCount++ + atomic.AddInt64(&seg.entryCount, 1) slot := seg.slotsData[slotOff : slotOff+seg.slotLens[slotId] : slotOff+seg.slotCap] copy(slot[idx+1:], slot[idx:]) slot[idx].offset = offset @@ -319,7 +326,7 @@ func (seg *segment) delEntryPtr(slotId uint8, hash16 uint16, offset int64) { seg.rb.WriteAt(entryHdrBuf[:], offset) copy(slot[idx:], slot[idx+1:]) seg.slotLens[slotId]-- - seg.entryCount-- + atomic.AddInt64(&seg.entryCount, -1) } func entryPtrIdx(slot []entryPtr, hash16 uint16) (idx int) { @@ -369,9 +376,31 @@ func (seg *segment) lookupByOff(slot []entryPtr, hash16 uint16, offset int64) (i } func (seg *segment) resetStatistics() { - seg.totalEvacuate = 0 - seg.totalExpired = 0 - seg.overwrites = 0 + atomic.StoreInt64(&seg.totalEvacuate, 0) + atomic.StoreInt64(&seg.totalExpired, 0) + atomic.StoreInt64(&seg.overwrites, 0) + atomic.StoreInt64(&seg.hitCount, 0) + atomic.StoreInt64(&seg.missCount, 0) +} + +func (seg *segment) clear() { + bufSize := len(seg.rb.data) + seg.rb = NewRingBuf(bufSize, 0) + seg.vacuumLen = int64(bufSize) + seg.slotCap = 1 + seg.slotsData = make([]entryPtr, 256*seg.slotCap) + for i := 0; i < len(seg.slotLens); i++ { + seg.slotLens[i] = 0 + } + + atomic.StoreInt64(&seg.hitCount, 0) + atomic.StoreInt64(&seg.missCount, 0) + atomic.StoreInt64(&seg.entryCount, 0) + atomic.StoreInt64(&seg.totalCount, 0) + atomic.StoreInt64(&seg.totalTime, 0) + atomic.StoreInt64(&seg.totalEvacuate, 0) + atomic.StoreInt64(&seg.totalExpired, 0) + atomic.StoreInt64(&seg.overwrites, 0) } func (seg *segment) resize(newsize int) { From fd76721383752f5e4cb36eb2cc5ce870dceb5af3 Mon Sep 17 00:00:00 2001 From: zhangcunli Date: Tue, 25 Sep 2018 11:35:23 +0800 Subject: [PATCH 6/6] update resize interface --- cache.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cache.go b/cache.go index 08e9d44..1c8ee71 100644 --- a/cache.go +++ b/cache.go @@ -212,9 +212,13 @@ func (cache *Cache) Resize(newSize int) { size = MIN_CACHE_SIZE } + if size == cache.cacheSize { + return + } + for i := 0; i < SEGMENT_NUMBER; i++ { cache.locks[i].Lock() - if size >= cache.cacheSize { + if size > cache.cacheSize { cache.segments[i].resize(size / SEGMENT_NUMBER) } else { //discard all data