Skip to content

Commit

Permalink
Persistence API (#19)
Browse files Browse the repository at this point in the history
* add persist API and simple test

* Update go.yaml

* update ci

* more tests

* add version check

* fix lint

* refactor

* update ttl test
  • Loading branch information
Yiling-J authored May 18, 2023
1 parent 850b817 commit 29018ac
Show file tree
Hide file tree
Showing 12 changed files with 733 additions and 44 deletions.
22 changes: 20 additions & 2 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ jobs:
version: v1.52.2
test:
name: test
runs-on: ubuntu-latest
strategy:
matrix:
go: ["1.19.x", "1.20.x"]
runs-on: ubuntu-latest
steps:
- name: Setup Go
with:
Expand All @@ -42,4 +42,22 @@ jobs:
run: make cover

- name: Upload coverage to codecov.io
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v3

test-os:
name: test-os
strategy:
matrix:
go: ["1.19.x", "1.20.x"]
os: [macos-latest, windows-latest, ubuntu-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Setup Go
with:
go-version: ${{ matrix.go }}
uses: actions/setup-go@v2

- uses: actions/checkout@v2

- name: Test
run: go test ./... -run=TestPersistOS
11 changes: 11 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package theine
import (
"context"
"errors"
"io"
"time"

"github.com/Yiling-J/theine-go/internal"
Expand All @@ -12,6 +13,8 @@ const (
ZERO_TTL = 0 * time.Second
)

var VersionMismatch = internal.VersionMismatch

type RemoveReason = internal.RemoveReason

type Loaded[V any] struct {
Expand Down Expand Up @@ -146,6 +149,14 @@ func (c *Cache[K, V]) Close() {
c.store.Close()
}

func (c *Cache[K, V]) SaveCache(version uint64, writer io.Writer) error {
return c.store.Persist(version, writer)
}

func (c *Cache[K, V]) LoadCache(version uint64, reader io.Reader) error {
return c.store.Recover(version, reader)
}

type LoadingCache[K comparable, V any] struct {
store *internal.LoadingStore[K, V]
}
Expand Down
34 changes: 33 additions & 1 deletion internal/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type Entry[K comparable, V any] struct {
cost atomic.Int64
expire atomic.Int64
frequency atomic.Int32
shard uint16
removed bool
deque bool
}
Expand Down Expand Up @@ -123,3 +122,36 @@ func (e *Entry[K, V]) setNext(entry *Entry[K, V], listType uint8) {
e.meta.wheelNext = entry
}
}

func (e *Entry[K, V]) pentry() *Pentry[K, V] {
return &Pentry[K, V]{
Key: e.key,
Value: e.value,
Cost: e.cost.Load(),
Expire: e.expire.Load(),
Frequency: e.frequency.Load(),
Removed: e.removed,
}
}

// entry for persistence
type Pentry[K comparable, V any] struct {
Key K
Value V
Cost int64
Expire int64
Frequency int32
Removed bool
}

func (e *Pentry[K, V]) entry() *Entry[K, V] {
en := &Entry[K, V]{
key: e.Key,
value: e.Value,
removed: e.Removed,
}
en.cost.Store(e.Cost)
en.frequency.Store(e.Frequency)
en.expire.Store(e.Expire)
return en
}
30 changes: 30 additions & 0 deletions internal/list.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package internal

import (
"bytes"
"encoding/gob"
"fmt"
"io"
"strings"
)

Expand Down Expand Up @@ -103,6 +106,11 @@ func (l *List[K, V]) PushFront(e *Entry[K, V]) *Entry[K, V] {
return l.insert(e, &l.root)
}

// Push push entry to the back of list
func (l *List[K, V]) PushBack(e *Entry[K, V]) *Entry[K, V] {
return l.insert(e, l.root.prev(l.listType))
}

// remove removes e from its list, decrements l.len
func (l *List[K, V]) remove(e *Entry[K, V]) {
e.prev(l.listType).setNext(e.next(l.listType), l.listType)
Expand Down Expand Up @@ -183,3 +191,25 @@ func (l *List[K, V]) Contains(entry *Entry[K, V]) bool {
}
return false
}

func (l *List[K, V]) Persist(writer io.Writer, blockEncoder *gob.Encoder, tp uint8) error {
buffer := bytes.NewBuffer(make([]byte, 0, BlockBufferSize))
block := NewBlock[*Pentry[K, V]](tp, buffer, blockEncoder)
for er := l.Front(); er != nil; er = er.Next(l.listType) {
e := er.pentry()
full, err := block.write(e)
if err != nil {
return err
}
if full {
buffer.Reset()
block = NewBlock[*Pentry[K, V]](tp, buffer, blockEncoder)
}
}
err := block.save()
if err != nil {
return err
}
buffer.Reset()
return nil
}
61 changes: 61 additions & 0 deletions internal/persistence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package internal

import (
"bytes"
"encoding/gob"

"github.com/zeebo/xxh3"
)

const BlockBufferSize = 4 * 1024 * 1024

type DataBlock[V any] struct {
Type uint8
CheckSum uint64
Data []byte
clean bool
buffer *bytes.Buffer // used in entryDecoder
// datablock should share single blockEncoder
// but use separate entryEncoder
blockEncoder *gob.Encoder
entryEncoder *gob.Encoder
}

func NewBlock[V any](tp uint8, buffer *bytes.Buffer, blockEncoder *gob.Encoder) *DataBlock[V] {
return &DataBlock[V]{
Type: tp,
buffer: buffer,
blockEncoder: blockEncoder,
entryEncoder: gob.NewEncoder(buffer),
clean: true,
}
}

func (b *DataBlock[V]) save() error {
if b.clean {
return nil
}
b.clean = true
data := b.buffer.Bytes()
b.CheckSum = xxh3.Hash(data)
b.Data = data
return b.blockEncoder.Encode(b)
}

func (b *DataBlock[V]) write(item V) (full bool, err error) {
err = b.entryEncoder.Encode(item)
if err != nil {
return false, err
}
b.clean = false
if b.buffer.Len() >= BlockBufferSize {
b.clean = true
data := b.buffer.Bytes()
b.CheckSum = xxh3.Hash(data)
b.Data = data
err = b.blockEncoder.Encode(b)
return true, err
}
return false, nil

}
164 changes: 164 additions & 0 deletions internal/persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package internal

import (
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestStorePersistence(t *testing.T) {
store := NewStore[int, int](1000, false)
for i := 0; i < 20; i++ {
_ = store.Set(i, i, 1, 0)
}
time.Sleep(200 * time.Millisecond)
for i := 0; i < 10; i++ {
_, _ = store.Get(i)
}
store.drainRead()
// now 0-9 in protected and 10-19 in probation
require.Equal(t, 10, store.policy.slru.protected.Len())
require.Equal(t, 10, store.policy.slru.probation.Len())
require.Equal(t, "9/8/7/6/5/4/3/2/1/0", store.policy.slru.protected.display())
require.Equal(t, "19/18/17/16/15/14/13/12/11/10", store.policy.slru.probation.display())
// add 5 entries to shard deque
for i := 20; i < 25; i++ {
entry := &Entry[int, int]{
key: i,
value: i,
}
entry.cost.Store(int64(1))
entry.frequency.Store(int32(i))
store.shards[0].deque.PushFront(entry)
}
// update sketch
for i := 0; i < 10; i++ {
_, _ = store.Get(5)
}
store.drainRead()
count := store.policy.sketch.Estimate(store.hasher.hash(5))
require.True(t, count > 5)

f, err := os.Create("stest")
defer os.Remove("stest")
require.Nil(t, err)
err = store.Persist(0, f)
require.Nil(t, err)
f.Close()

new := NewStore[int, int](1000, false)
// manually set deque size of shard
for _, shard := range new.shards {
shard.qsize = 10
}
f, err = os.Open("stest")
require.Nil(t, err)
err = new.Recover(0, f)
require.Nil(t, err)
f.Close()
m := map[int]int{}
new.Range(func(key, value int) bool {
m[key] = value
return true
})
require.Equal(t, 25, len(m))
for k, v := range m {
require.Equal(t, k, v)
}
require.Equal(t, 10, new.policy.slru.protected.Len())
require.Equal(t, 10, new.policy.slru.probation.Len())
require.Equal(t, "5/9/8/7/6/4/3/2/1/0", new.policy.slru.protected.display())
require.Equal(t, "19/18/17/16/15/14/13/12/11/10", new.policy.slru.probation.display())

count = new.policy.sketch.Estimate(store.hasher.hash(5))
require.True(t, count > 5)

}

func TestStorePersistenceTTL(t *testing.T) {
store := NewStore[int, int](1000, false)
for i := 0; i < 10; i++ {
_ = store.Set(i, i, 1, 2*time.Second)
}
for i := 10; i < 20; i++ {
_ = store.Set(i, i, 1, 5*time.Second)
}
for i := 20; i < 30; i++ {
_ = store.Set(i, i, 1, 1*time.Second)
}
time.Sleep(200 * time.Millisecond)

f, err := os.Create("stest")
defer os.Remove("stest")
require.Nil(t, err)
err = store.Persist(0, f)
require.Nil(t, err)
f.Close()
// expire 20-29
time.Sleep(time.Second)
new := NewStore[int, int](1000, false)
f, err = os.Open("stest")
require.Nil(t, err)
err = new.Recover(0, f)
require.Nil(t, err)
f.Close()
m := map[int]int{}
new.Range(func(key, value int) bool {
m[key] = value
return true
})
require.Equal(t, 20, len(m))
time.Sleep(2 * time.Second)
for i := 0; i < 10; i++ {
_, ok := new.Get(i)
require.False(t, ok)
}
for i := 10; i < 20; i++ {
_, ok := new.Get(i)
require.True(t, ok)
}
time.Sleep(3 * time.Second)
for i := 10; i < 20; i++ {
_, ok := new.Get(i)
require.False(t, ok)
}
}

func TestStorePersistenceResize(t *testing.T) {
store := NewStore[int, int](1000, false)
for i := 0; i < 1000; i++ {
_ = store.Set(i, i, 1, 0)
}
time.Sleep(200 * time.Millisecond)
for i := 0; i < 500; i++ {
_, _ = store.Get(i)
}
store.drainRead()
// now 0-499 in protected and 500-999 in probation
require.Equal(t, 500, store.policy.slru.protected.Len())
require.Equal(t, 500, store.policy.slru.probation.Len())

f, err := os.Create("stest")
defer os.Remove("stest")
require.Nil(t, err)
err = store.Persist(0, f)
require.Nil(t, err)
f.Close()

new := NewStore[int, int](100, false)
f, err = os.Open("stest")
require.Nil(t, err)
err = new.Recover(0, f)
require.Nil(t, err)
f.Close()
// new cache protected size is 80, should contains latest 80 entries of original protected
require.Equal(t, 80, new.policy.slru.protected.Len())
// new cache probation size is 20, should contains latest 20 entries of original probation
require.Equal(t, 20, new.policy.slru.probation.Len())
expected := "499/498/497/496/495/494/493/492/491/490/489/488/487/486/485/484/483/482/481/480/479/478/477/476/475/474/473/472/471/470/469/468/467/466/465/464/463/462/461/460/459/458/457/456/455/454/453/452/451/450/449/448/447/446/445/444/443/442/441/440/439/438/437/436/435/434/433/432/431/430/429/428/427/426/425/424/423/422/421/420"
require.Equal(t, expected, new.policy.slru.protected.display())
expected = "999/998/997/996/995/994/993/992/991/990/989/988/987/986/985/984/983/982/981/980"
require.Equal(t, expected, new.policy.slru.probation.display())
}
Loading

0 comments on commit 29018ac

Please sign in to comment.