Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a function to evict all elements older than the cache TTL #5464

Merged
merged 15 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ type Options struct {
// to control the max size in bytes of the cache
// It is required option if MaxCount is not provided
MaxSize uint64

// ActivelyEvict will evict items that has expired TTL at every operation in the cache
// This can be expensive if a lot of items expire at the same time
// Should be used when it's important for memory that the expired items are evicted as soon as possible
// If not set expired items will be evicted when one of these happens
// - when the cache is full
// - when the item is accessed
ActivelyEvict bool
}

// SimpleOptions provides options that can be used to configure SimpleCache
Expand Down
73 changes: 51 additions & 22 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ const cacheCountLimit = 1 << 25
// lru is a concurrent fixed size cache that evicts elements in lru order
type (
lru struct {
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxCount int
ttl time.Duration
pin bool
rmFunc RemovedFunc
sizeFunc GetCacheItemSizeFunc
maxSize uint64
currSize uint64
sizeByKey map[interface{}]uint64
isSizeBased bool
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxCount int
ttl time.Duration
pin bool
rmFunc RemovedFunc
sizeFunc GetCacheItemSizeFunc
maxSize uint64
currSize uint64
sizeByKey map[interface{}]uint64
isSizeBased bool
activelyEvict bool
// We use this instead of time.Now() in order to make testing easier
now func() time.Time
}

iteratorImpl struct {
Expand Down Expand Up @@ -114,7 +117,7 @@ func (c *lru) Iterator() Iterator {
c.mut.Lock()
iterator := &iteratorImpl{
lru: c,
createTime: time.Now(),
createTime: c.now(),
nextItem: c.byAccess.Front(),
}
iterator.prepareNext()
Expand All @@ -141,11 +144,13 @@ func New(opts *Options) Cache {
}

cache := &lru{
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
pin: opts.Pin,
rmFunc: opts.RemovedFunc,
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
pin: opts.Pin,
rmFunc: opts.RemovedFunc,
activelyEvict: opts.ActivelyEvict,
now: time.Now,
}

cache.isSizeBased = opts.GetCacheItemSizeFunc != nil && opts.MaxSize > 0
Expand All @@ -169,14 +174,16 @@ func (c *lru) Get(key interface{}) interface{} {
c.mut.Lock()
defer c.mut.Unlock()

c.evictExpiredItems()

element := c.byKey[key]
if element == nil {
return nil
}

entry := element.Value.(*entryImpl)

if c.isEntryExpired(entry, time.Now()) {
if c.isEntryExpired(entry, c.now()) {
// Entry has expired
c.deleteInternal(element)
return nil
Expand Down Expand Up @@ -218,6 +225,8 @@ func (c *lru) Delete(key interface{}) {
c.mut.Lock()
defer c.mut.Unlock()

c.evictExpiredItems()

element := c.byKey[key]
if element != nil {
c.deleteInternal(element)
Expand All @@ -242,28 +251,48 @@ func (c *lru) Size() int {
c.mut.Lock()
defer c.mut.Unlock()

c.evictExpiredItems()

return len(c.byKey)
}

// evictExpiredItems evicts all items in the cache which are expired
func (c *lru) evictExpiredItems() {
if !c.activelyEvict {
return // do nothing if activelyEvict is not set
}

now := c.now()
for elt := c.byAccess.Back(); elt != nil; elt = c.byAccess.Back() {
if !c.isEntryExpired(elt.Value.(*entryImpl), now) {
// List is sorted by item age, so we can stop as soon as we found first non expired item.
break
}
c.deleteInternal(elt)
}
}

// Put puts a new value associated with a given key, returning the existing value (if present)
// allowUpdate flag is used to control overwrite behavior if the value exists
func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) (interface{}, error) {
valueSize := c.sizeFunc(value)
c.mut.Lock()
defer c.mut.Unlock()

c.evictExpiredItems()

elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
if c.isEntryExpired(entry, time.Now()) {
if c.isEntryExpired(entry, c.now()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now()
entry.createTime = c.now()
}
}

Expand All @@ -285,7 +314,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
}

if c.ttl != 0 {
entry.createTime = time.Now()
entry.createTime = c.now()
}

c.byKey[key] = c.byAccess.PushFront(entry)
Expand Down
40 changes: 40 additions & 0 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type keyType struct {
Expand Down Expand Up @@ -384,3 +385,42 @@ func TestPanicOptionsIsNil(t *testing.T) {

New(nil)
}

func TestEvictItemsPastTimeToLive_ActivelyEvict(t *testing.T) {
// Create the cache with a TTL of 75s
cache, ok := New(&Options{
MaxCount: 5,
TTL: time.Second * 75,
ActivelyEvict: true,
}).(*lru)
require.True(t, ok)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

_, err := cache.PutIfNotExist("A", 1)
require.NoError(t, err)
_, err = cache.PutIfNotExist("B", 2)
require.NoError(t, err)

// Nothing is expired after 50s
currentTime = currentTime.Add(time.Second * 50)
assert.Equal(t, 2, cache.Size())

_, err = cache.PutIfNotExist("C", 3)
require.NoError(t, err)
_, err = cache.PutIfNotExist("D", 4)
require.NoError(t, err)

// No time has passed, so still nothing is expired
assert.Equal(t, 4, cache.Size())

// Advance time to 100s, so A and B should be expired
currentTime = currentTime.Add(time.Second * 50)
assert.Equal(t, 2, cache.Size())

// Advance time to 150s, so C and D should be expired as well
currentTime = currentTime.Add(time.Second * 50)
assert.Equal(t, 0, cache.Size())
}