Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce GetFn and PeekFn methods, for zero-copy access. #92

Merged
merged 1 commit into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ func (cache *Cache) Get(key []byte) (value []byte, err error) {
return
}

// GetFn is equivalent to Get or GetWithBuf, but it attempts to be zero-copy,
// calling the provided function with slice view over the current underlying
// value of the key in memory. The slice is constrained in length and capacity.
//
// In moth cases, this method will not alloc a byte buffer. The only exception
// is when the value wraps around the underlying segment ring buffer.
//
// The method will return ErrNotFound is there's a miss, and the function will
// not be called. Errors returned by the function will be propagated.
func (cache *Cache) GetFn(key []byte, fn func([]byte) error) (err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
err = cache.segments[segID].view(key, fn, hashVal, false)
cache.locks[segID].Unlock()
return
}

// GetOrSet returns existing value or if record doesn't exist
// it sets a new key, value and expiration for a cache entry and stores it in the cache, returns nil in that case
func (cache *Cache) GetOrSet(key, value []byte, expireSeconds int) (retValue []byte, err error) {
Expand All @@ -109,6 +127,24 @@ func (cache *Cache) Peek(key []byte) (value []byte, err error) {
return
}

// PeekFn is equivalent to Peek, but it attempts to be zero-copy, calling the
// provided function with slice view over the current underlying value of the
// key in memory. The slice is constrained in length and capacity.
//
// In moth cases, this method will not alloc a byte buffer. The only exception
// is when the value wraps around the underlying segment ring buffer.
//
// The method will return ErrNotFound is there's a miss, and the function will
// not be called. Errors returned by the function will be propagated.
func (cache *Cache) PeekFn(key []byte, fn func([]byte) error) (err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
err = cache.segments[segID].view(key, fn, hashVal, true)
cache.locks[segID].Unlock()
return
}

// GetWithBuf copies the value to the buf or returns not found error.
// This method doesn't allocate memory when the capacity of buf is greater or equal to value.
func (cache *Cache) GetWithBuf(key, buf []byte) (value []byte, err error) {
Expand Down
37 changes: 36 additions & 1 deletion cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func TestFreeCache(t *testing.T) {
t.Errorf("value is %v, expected %v", string(value), expectedValStr)
}
}
err = cache.GetFn([]byte(keyStr), func(val []byte) error {
if string(val) != expectedValStr {
t.Errorf("getfn: value is %v, expected %v", string(val), expectedValStr)
}
return nil
})
}

t.Logf("hit rate is %v, evacuates %v, entries %v, average time %v, expire count %v\n",
Expand Down Expand Up @@ -156,6 +162,15 @@ func TestGetOrSet(t *testing.T) {
if err != nil || string(r) != "efgh" {
t.Errorf("Expected to get old record, got: value=%v, err=%v", string(r), err)
}
err = cache.GetFn(key, func(val []byte) error {
if string(val) != "efgh" {
t.Errorf("getfn: Expected to get old record, got: value=%v, err=%v", string(r), err)
}
return nil
})
if err != nil {
t.Errorf("did not expect error from GetFn, got: %s", err)
}
}

func TestGetWithExpiration(t *testing.T) {
Expand Down Expand Up @@ -663,7 +678,7 @@ func BenchmarkCacheGet(b *testing.B) {
}
}

func BenchmarkParallelCacheGet(b *testing.B) {
func BenchmarkCacheGetFn(b *testing.B) {
b.ReportAllocs()
b.StopTimer()
cache := NewCache(256 * 1024 * 1024)
Expand All @@ -674,7 +689,27 @@ func BenchmarkParallelCacheGet(b *testing.B) {
cache.Set(key[:], buf, 0)
}
b.StartTimer()
for i := 0; i < b.N; i++ {
binary.LittleEndian.PutUint64(key[:], uint64(i))
_ = cache.GetFn(key[:], func(val []byte) error {
_ = val
return nil
})
}
b.Logf("b.N: %d; hit rate: %f", b.N, cache.HitRate())
}

func BenchmarkParallelCacheGet(b *testing.B) {
b.ReportAllocs()
b.StopTimer()
cache := NewCache(256 * 1024 * 1024)
buf := make([]byte, 64)
var key [8]byte
for i := 0; i < b.N; i++ {
binary.LittleEndian.PutUint64(key[:], uint64(i))
cache.Set(key[:], buf, 0)
}
b.StartTimer()
b.RunParallel(func(pb *testing.PB) {
counter := 0
b.ReportAllocs()
Expand Down
30 changes: 30 additions & 0 deletions ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,36 @@ func (rb *RingBuf) ReadAt(p []byte, off int64) (n int, err error) {
return
}

// Slice returns a slice of the supplied range of the ring buffer. It will
// not alloc unless the requested range wraps the ring buffer.
func (rb *RingBuf) Slice(off, length int64) ([]byte, error) {
if off > rb.end || off < rb.begin {
return nil, ErrOutOfRange
}
var readOff int
if rb.end-rb.begin < int64(len(rb.data)) {
readOff = int(off - rb.begin)
} else {
readOff = rb.index + int(off-rb.begin)
}
if readOff >= len(rb.data) {
readOff -= len(rb.data)
}
readEnd := readOff + int(length)
if readEnd <= len(rb.data) {
return rb.data[readOff:readEnd:readEnd], nil
}
buf := make([]byte, length)
n := copy(buf, rb.data[readOff:])
if n < int(length) {
n += copy(buf[n:], rb.data[:readEnd-len(rb.data)])
}
if n < int(length) {
return nil, io.EOF
}
return buf, nil
}

func (rb *RingBuf) Write(p []byte) (n int, err error) {
if len(p) > len(rb.data) {
err = ErrOutOfRange
Expand Down
62 changes: 43 additions & 19 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,11 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e

slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)

var hdrBuf [ENTRY_HDR_SIZE]byte
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))

slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
if match {
matchedPtr := &slot[idx]
seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
Expand Down Expand Up @@ -158,7 +157,6 @@ func (seg *segment) touch(key []byte, hashVal uint64, expireSeconds int) (err er

slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)

slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
if !match {
Expand Down Expand Up @@ -238,6 +236,44 @@ func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModi
}

func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
hdr, ptr, err := seg.locate(key, hashVal, peek)
if err != nil {
return
}
expireAt = hdr.expireAt
if cap(buf) >= int(hdr.valLen) {
value = buf[:hdr.valLen]
} else {
value = make([]byte, hdr.valLen)
}

seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
return
}

// view provides zero-copy access to the element's value, without copying to
// an intermediate buffer.
func (seg *segment) view(key []byte, fn func([]byte) error, hashVal uint64, peek bool) (err error) {
hdr, ptr, err := seg.locate(key, hashVal, peek)
if err != nil {
return
}
start := ptr.offset + ENTRY_HDR_SIZE + int64(hdr.keyLen)
val, err := seg.rb.Slice(start, int64(hdr.valLen))
if err != nil {
return err
}
err = fn(val)
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
return
}

func (seg *segment) locate(key []byte, hashVal uint64, peek bool) (hdr *entryHdr, ptr *entryPtr, err error) {
slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
Expand All @@ -249,15 +285,13 @@ func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byt
}
return
}
ptr := &slot[idx]
ptr = &slot[idx]

var hdrBuf [ENTRY_HDR_SIZE]byte
seg.rb.ReadAt(hdrBuf[:], ptr.offset)
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
hdr = (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
if !peek {
now := seg.timer.Now()
expireAt = hdr.expireAt

if hdr.expireAt != 0 && hdr.expireAt <= now {
seg.delEntryPtr(slotId, slot, idx)
atomic.AddInt64(&seg.totalExpired, 1)
Expand All @@ -269,17 +303,7 @@ func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byt
hdr.accessTime = now
seg.rb.WriteAt(hdrBuf[:], ptr.offset)
}
if cap(buf) >= int(hdr.valLen) {
value = buf[:hdr.valLen]
} else {
value = make([]byte, hdr.valLen)
}

seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
return
return hdr, ptr, err
}

func (seg *segment) del(key []byte, hashVal uint64) (affected bool) {
Expand Down