Skip to content

Commit

Permalink
introduce GetFn and PeekFn methods, for zero-copy access. (#92)
Browse files Browse the repository at this point in the history
This commit introduces two methods GetFn and PeekFn. These methods
provide zero-copy access to cache entries, avoiding allocs unless
absolutely necessary.

They accept a function, and call it with a slice over the current
underlying value of the key in memory. The slice is constrained
in length and capacity. The RingBuf struct has been augmented with
a Slice() method to support these operations.

The only case where these variants allocate is when the value wraps
around the ring buffer.
  • Loading branch information
raulk authored Nov 16, 2020
1 parent cba08da commit b0e1f92
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 20 deletions.
36 changes: 36 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ func (cache *Cache) Get(key []byte) (value []byte, err error) {
return
}

// GetFn is equivalent to Get or GetWithBuf, but it attempts to be zero-copy,
// calling the provided function with slice view over the current underlying
// value of the key in memory. The slice is constrained in length and capacity.
//
// In moth cases, this method will not alloc a byte buffer. The only exception
// is when the value wraps around the underlying segment ring buffer.
//
// The method will return ErrNotFound is there's a miss, and the function will
// not be called. Errors returned by the function will be propagated.
func (cache *Cache) GetFn(key []byte, fn func([]byte) error) (err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
err = cache.segments[segID].view(key, fn, hashVal, false)
cache.locks[segID].Unlock()
return
}

// GetOrSet returns existing value or if record doesn't exist
// it sets a new key, value and expiration for a cache entry and stores it in the cache, returns nil in that case
func (cache *Cache) GetOrSet(key, value []byte, expireSeconds int) (retValue []byte, err error) {
Expand All @@ -109,6 +127,24 @@ func (cache *Cache) Peek(key []byte) (value []byte, err error) {
return
}

// PeekFn is equivalent to Peek, but it attempts to be zero-copy, calling the
// provided function with slice view over the current underlying value of the
// key in memory. The slice is constrained in length and capacity.
//
// In moth cases, this method will not alloc a byte buffer. The only exception
// is when the value wraps around the underlying segment ring buffer.
//
// The method will return ErrNotFound is there's a miss, and the function will
// not be called. Errors returned by the function will be propagated.
func (cache *Cache) PeekFn(key []byte, fn func([]byte) error) (err error) {
hashVal := hashFunc(key)
segID := hashVal & segmentAndOpVal
cache.locks[segID].Lock()
err = cache.segments[segID].view(key, fn, hashVal, true)
cache.locks[segID].Unlock()
return
}

// GetWithBuf copies the value to the buf or returns not found error.
// This method doesn't allocate memory when the capacity of buf is greater or equal to value.
func (cache *Cache) GetWithBuf(key, buf []byte) (value []byte, err error) {
Expand Down
37 changes: 36 additions & 1 deletion cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func TestFreeCache(t *testing.T) {
t.Errorf("value is %v, expected %v", string(value), expectedValStr)
}
}
err = cache.GetFn([]byte(keyStr), func(val []byte) error {
if string(val) != expectedValStr {
t.Errorf("getfn: value is %v, expected %v", string(val), expectedValStr)
}
return nil
})
}

t.Logf("hit rate is %v, evacuates %v, entries %v, average time %v, expire count %v\n",
Expand Down Expand Up @@ -156,6 +162,15 @@ func TestGetOrSet(t *testing.T) {
if err != nil || string(r) != "efgh" {
t.Errorf("Expected to get old record, got: value=%v, err=%v", string(r), err)
}
err = cache.GetFn(key, func(val []byte) error {
if string(val) != "efgh" {
t.Errorf("getfn: Expected to get old record, got: value=%v, err=%v", string(r), err)
}
return nil
})
if err != nil {
t.Errorf("did not expect error from GetFn, got: %s", err)
}
}

func TestGetWithExpiration(t *testing.T) {
Expand Down Expand Up @@ -663,7 +678,7 @@ func BenchmarkCacheGet(b *testing.B) {
}
}

func BenchmarkParallelCacheGet(b *testing.B) {
func BenchmarkCacheGetFn(b *testing.B) {
b.ReportAllocs()
b.StopTimer()
cache := NewCache(256 * 1024 * 1024)
Expand All @@ -674,7 +689,27 @@ func BenchmarkParallelCacheGet(b *testing.B) {
cache.Set(key[:], buf, 0)
}
b.StartTimer()
for i := 0; i < b.N; i++ {
binary.LittleEndian.PutUint64(key[:], uint64(i))
_ = cache.GetFn(key[:], func(val []byte) error {
_ = val
return nil
})
}
b.Logf("b.N: %d; hit rate: %f", b.N, cache.HitRate())
}

func BenchmarkParallelCacheGet(b *testing.B) {
b.ReportAllocs()
b.StopTimer()
cache := NewCache(256 * 1024 * 1024)
buf := make([]byte, 64)
var key [8]byte
for i := 0; i < b.N; i++ {
binary.LittleEndian.PutUint64(key[:], uint64(i))
cache.Set(key[:], buf, 0)
}
b.StartTimer()
b.RunParallel(func(pb *testing.PB) {
counter := 0
b.ReportAllocs()
Expand Down
30 changes: 30 additions & 0 deletions ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,36 @@ func (rb *RingBuf) ReadAt(p []byte, off int64) (n int, err error) {
return
}

// Slice returns a slice of the supplied range of the ring buffer. It will
// not alloc unless the requested range wraps the ring buffer.
func (rb *RingBuf) Slice(off, length int64) ([]byte, error) {
if off > rb.end || off < rb.begin {
return nil, ErrOutOfRange
}
var readOff int
if rb.end-rb.begin < int64(len(rb.data)) {
readOff = int(off - rb.begin)
} else {
readOff = rb.index + int(off-rb.begin)
}
if readOff >= len(rb.data) {
readOff -= len(rb.data)
}
readEnd := readOff + int(length)
if readEnd <= len(rb.data) {
return rb.data[readOff:readEnd:readEnd], nil
}
buf := make([]byte, length)
n := copy(buf, rb.data[readOff:])
if n < int(length) {
n += copy(buf[n:], rb.data[:readEnd-len(rb.data)])
}
if n < int(length) {
return nil, io.EOF
}
return buf, nil
}

func (rb *RingBuf) Write(p []byte) (n int, err error) {
if len(p) > len(rb.data) {
err = ErrOutOfRange
Expand Down
62 changes: 43 additions & 19 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,11 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e

slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)

var hdrBuf [ENTRY_HDR_SIZE]byte
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))

slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
if match {
matchedPtr := &slot[idx]
seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
Expand Down Expand Up @@ -158,7 +157,6 @@ func (seg *segment) touch(key []byte, hashVal uint64, expireSeconds int) (err er

slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)

slot := seg.getSlot(slotId)
idx, match := seg.lookup(slot, hash16, key)
if !match {
Expand Down Expand Up @@ -238,6 +236,44 @@ func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModi
}

func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
hdr, ptr, err := seg.locate(key, hashVal, peek)
if err != nil {
return
}
expireAt = hdr.expireAt
if cap(buf) >= int(hdr.valLen) {
value = buf[:hdr.valLen]
} else {
value = make([]byte, hdr.valLen)
}

seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
return
}

// view provides zero-copy access to the element's value, without copying to
// an intermediate buffer.
func (seg *segment) view(key []byte, fn func([]byte) error, hashVal uint64, peek bool) (err error) {
hdr, ptr, err := seg.locate(key, hashVal, peek)
if err != nil {
return
}
start := ptr.offset + ENTRY_HDR_SIZE + int64(hdr.keyLen)
val, err := seg.rb.Slice(start, int64(hdr.valLen))
if err != nil {
return err
}
err = fn(val)
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
return
}

func (seg *segment) locate(key []byte, hashVal uint64, peek bool) (hdr *entryHdr, ptr *entryPtr, err error) {
slotId := uint8(hashVal >> 8)
hash16 := uint16(hashVal >> 16)
slot := seg.getSlot(slotId)
Expand All @@ -249,15 +285,13 @@ func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byt
}
return
}
ptr := &slot[idx]
ptr = &slot[idx]

var hdrBuf [ENTRY_HDR_SIZE]byte
seg.rb.ReadAt(hdrBuf[:], ptr.offset)
hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
hdr = (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
if !peek {
now := seg.timer.Now()
expireAt = hdr.expireAt

if hdr.expireAt != 0 && hdr.expireAt <= now {
seg.delEntryPtr(slotId, slot, idx)
atomic.AddInt64(&seg.totalExpired, 1)
Expand All @@ -269,17 +303,7 @@ func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byt
hdr.accessTime = now
seg.rb.WriteAt(hdrBuf[:], ptr.offset)
}
if cap(buf) >= int(hdr.valLen) {
value = buf[:hdr.valLen]
} else {
value = make([]byte, hdr.valLen)
}

seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
if !peek {
atomic.AddInt64(&seg.hitCount, 1)
}
return
return hdr, ptr, err
}

func (seg *segment) del(key []byte, hashVal uint64) (affected bool) {
Expand Down

0 comments on commit b0e1f92

Please sign in to comment.