Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sets with TTL #122

Merged
merged 15 commits into from
Jan 28, 2020
77 changes: 54 additions & 23 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/dgraph-io/ristretto/z"
)
Expand All @@ -33,6 +34,8 @@ const (
setBufSize = 32 * 1024
)

type onEvictFunc func(uint64, uint64, interface{}, int64)

// Cache is a thread-safe implementation of a hashmap with a TinyLFU admission
// policy and a Sampled LFU eviction policy. You can use the same Cache instance
// from as many goroutines as you want.
Expand All @@ -48,7 +51,7 @@ type Cache struct {
// contention.
setBuf chan *item
// onEvict is called for item evictions.
onEvict func(uint64, uint64, interface{}, int64)
onEvict onEvictFunc
// KeyToHash function is used to customize the key hashing algorithm.
// Each key will be hashed using the provided function. If keyToHash value
// is not set, the default keyToHash function is used.
Expand All @@ -57,6 +60,8 @@ type Cache struct {
stop chan struct{}
// cost calculates cost from a value.
cost func(value interface{}) int64
// cleanupTicker is used to periodically check for entries whose TTL has passed.
cleanupTicker *time.Ticker
// Metrics contains a running log of important statistics like hits, misses,
// and dropped items.
Metrics *Metrics
Expand Down Expand Up @@ -115,11 +120,12 @@ const (

// item is passed to setBuf so items can eventually be added to the cache.
type item struct {
flag itemFlag
key uint64
conflict uint64
value interface{}
cost int64
flag itemFlag
key uint64
conflict uint64
value interface{}
cost int64
expiration time.Time
}

// NewCache returns a new Cache instance and any configuration errors, if any.
Expand All @@ -134,14 +140,15 @@ func NewCache(config *Config) (*Cache, error) {
}
policy := newPolicy(config.NumCounters, config.MaxCost)
cache := &Cache{
store: newStore(),
policy: policy,
getBuf: newRingBuffer(policy, config.BufferItems),
setBuf: make(chan *item, setBufSize),
onEvict: config.OnEvict,
keyToHash: config.KeyToHash,
stop: make(chan struct{}),
cost: config.Cost,
store: newStore(newExpirationMap()),
policy: policy,
getBuf: newRingBuffer(policy, config.BufferItems),
setBuf: make(chan *item, setBufSize),
onEvict: config.OnEvict,
keyToHash: config.KeyToHash,
stop: make(chan struct{}),
cost: config.Cost,
cleanupTicker: time.NewTicker(bucketSize),
}
if cache.keyToHash == nil {
cache.keyToHash = z.KeyToHash
Expand Down Expand Up @@ -184,20 +191,42 @@ func (c *Cache) Get(key interface{}) (interface{}, bool) {
// the cost parameter to 0 and Coster will be ran when needed in order to find
// the items true cost.
func (c *Cache) Set(key, value interface{}, cost int64) bool {
return c.SetWithTTL(key, value, cost, 0*time.Second)
}

// SetWithTTL works like Set but adds a key-value pair to the cache that will expire
// after the specified TTL (time to live) has passed. A zero value means the value never
// exexpire, which is identical to calling Set. A negative value is a no-op and the value
// is discarded.
func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool {
if c == nil || key == nil {
return false
}

var expiration time.Time
switch {
case ttl == 0:
// No expiration.
break
case ttl < 0:
// Treat this a a no-op.
return false
default:
expiration = time.Now().Add(ttl)
}

keyHash, conflictHash := c.keyToHash(key)
i := &item{
flag: itemNew,
key: keyHash,
conflict: conflictHash,
value: value,
cost: cost,
flag: itemNew,
key: keyHash,
conflict: conflictHash,
value: value,
cost: cost,
expiration: expiration,
}
// Attempt to immediately update hashmap value and set flag to update so the
// cost is eventually updated.
if c.store.Update(keyHash, conflictHash, i.value) {
// cost is eventually updated. The expiration must also be immediately updated
// to prevent items from being prematurely removed from the map.
if c.store.Update(i) {
i.flag = itemUpdate
}
// Attempt to send item to policy.
Expand Down Expand Up @@ -277,7 +306,7 @@ func (c *Cache) processItems() {
case itemNew:
victims, added := c.policy.Add(i.key, i.cost)
if added {
c.store.Set(i.key, i.conflict, i.value)
c.store.Set(i)
c.Metrics.add(keyAdd, i.key, 1)
}
for _, victim := range victims {
Expand All @@ -294,6 +323,8 @@ func (c *Cache) processItems() {
c.policy.Del(i.key) // Deals with metrics updates.
c.store.Del(i.key, i.conflict)
}
case <-c.cleanupTicker.C:
c.store.Cleanup(c.policy, c.onEvict)
case <-c.stop:
return
}
Expand Down
73 changes: 72 additions & 1 deletion cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/dgraph-io/ristretto/z"
"github.com/stretchr/testify/require"
)

var wait = time.Millisecond * 10
Expand Down Expand Up @@ -273,7 +274,12 @@ func TestCacheGet(t *testing.T) {
panic(err)
}
key, conflict := z.KeyToHash(1)
c.store.Set(key, conflict, 1)
i := item{
key: key,
conflict: conflict,
value: 1,
}
c.store.Set(&i)
if val, ok := c.Get(1); val == nil || !ok {
t.Fatal("get should be successful")
}
Expand Down Expand Up @@ -338,6 +344,65 @@ func TestCacheSet(t *testing.T) {
}
}

func TestCacheSetWithTTL(t *testing.T) {
m := &sync.Mutex{}
evicted := make(map[uint64]struct{})
c, err := NewCache(&Config{
NumCounters: 100,
MaxCost: 10,
BufferItems: 64,
Metrics: true,
OnEvict: func(key, conflict uint64, value interface{}, cost int64) {
m.Lock()
defer m.Unlock()
evicted[key] = struct{}{}
},
})
require.NoError(t, err)

// retrySet calls SetWithTTL until the item is accepted by the cache.
retrySet := func(key, value int, cost int64, ttl time.Duration) {
for {
if set := c.SetWithTTL(key, value, cost, ttl); !set {
time.Sleep(wait)
continue
}

time.Sleep(wait)
val, ok := c.Get(key)
require.True(t, ok)
require.NotNil(t, val)
require.Equal(t, value, val.(int))
return
}
}

retrySet(1, 1, 1, time.Second)

// Sleep to make sure the item has expired after execution resumes.
time.Sleep(2 * time.Second)
val, ok := c.Get(1)
require.False(t, ok)
require.Nil(t, val)

// Sleep to ensure that the bucket where the item was stored has been cleared
// from the expiraton map.
time.Sleep(5 * time.Second)
m.Lock()
require.Equal(t, 1, len(evicted))
_, ok = evicted[1]
require.True(t, ok)
m.Unlock()

// Verify that expiration times are overwritten.
retrySet(2, 1, 1, 100*time.Millisecond)
retrySet(2, 2, 1, 100*time.Second)
time.Sleep(3 * time.Second)
val, ok = c.Get(2)
require.True(t, ok)
require.Equal(t, 2, val.(int))
}

func TestCacheDel(t *testing.T) {
c, err := NewCache(&Config{
NumCounters: 100,
Expand Down Expand Up @@ -524,3 +589,9 @@ func TestCacheMetricsClear(t *testing.T) {
c.Metrics = nil
c.Metrics.Clear()
}

func TestMain(m *testing.M) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SA3000: TestMain should call os.Exit to set exit code (from staticcheck)

// Set bucketSizeSecs to 1 to avoid waiting too much during the tests.
bucketSize = time.Second
m.Run()
}
Loading