Skip to content

Commit

Permalink
Added a function to evict all elements older than the cache TTL (#5464)
Browse files Browse the repository at this point in the history
What changed?
Added functionality to evict all elements in the LRU cache older than the TTL

Why?
This will enable us to clean out the cache periodically. This is useful for the workflow specific rate limits, as we will only need to keep items which are accessed very frequently.

We will set the TTL to a low value and periodically clear out the elements that are above this limit to save memory.

This way there is no need to set and manage a specific cache size

How did you test it?
Unit tests

Potential risks
No risk, just adding functionality

Release notes

Documentation Changes
  • Loading branch information
jakobht authored Dec 5, 2023
1 parent 9d3ab0b commit 42bdd9e
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 22 deletions.
8 changes: 8 additions & 0 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ type Options struct {
// to control the max size in bytes of the cache
// It is required option if MaxCount is not provided
MaxSize uint64

// ActivelyEvict will evict items that has expired TTL at every operation in the cache
// This can be expensive if a lot of items expire at the same time
// Should be used when it's important for memory that the expired items are evicted as soon as possible
// If not set expired items will be evicted when one of these happens
// - when the cache is full
// - when the item is accessed
ActivelyEvict bool
}

// SimpleOptions provides options that can be used to configure SimpleCache
Expand Down
73 changes: 51 additions & 22 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ const cacheCountLimit = 1 << 25
// lru is a concurrent fixed size cache that evicts elements in lru order
type (
lru struct {
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxCount int
ttl time.Duration
pin bool
rmFunc RemovedFunc
sizeFunc GetCacheItemSizeFunc
maxSize uint64
currSize uint64
sizeByKey map[interface{}]uint64
isSizeBased bool
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxCount int
ttl time.Duration
pin bool
rmFunc RemovedFunc
sizeFunc GetCacheItemSizeFunc
maxSize uint64
currSize uint64
sizeByKey map[interface{}]uint64
isSizeBased bool
activelyEvict bool
// We use this instead of time.Now() in order to make testing easier
now func() time.Time
}

iteratorImpl struct {
Expand Down Expand Up @@ -114,7 +117,7 @@ func (c *lru) Iterator() Iterator {
c.mut.Lock()
iterator := &iteratorImpl{
lru: c,
createTime: time.Now(),
createTime: c.now(),
nextItem: c.byAccess.Front(),
}
iterator.prepareNext()
Expand All @@ -141,11 +144,13 @@ func New(opts *Options) Cache {
}

cache := &lru{
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
pin: opts.Pin,
rmFunc: opts.RemovedFunc,
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
pin: opts.Pin,
rmFunc: opts.RemovedFunc,
activelyEvict: opts.ActivelyEvict,
now: time.Now,
}

cache.isSizeBased = opts.GetCacheItemSizeFunc != nil && opts.MaxSize > 0
Expand All @@ -169,14 +174,16 @@ func (c *lru) Get(key interface{}) interface{} {
c.mut.Lock()
defer c.mut.Unlock()

c.evictExpiredItems()

element := c.byKey[key]
if element == nil {
return nil
}

entry := element.Value.(*entryImpl)

if c.isEntryExpired(entry, time.Now()) {
if c.isEntryExpired(entry, c.now()) {
// Entry has expired
c.deleteInternal(element)
return nil
Expand Down Expand Up @@ -218,6 +225,8 @@ func (c *lru) Delete(key interface{}) {
c.mut.Lock()
defer c.mut.Unlock()

c.evictExpiredItems()

element := c.byKey[key]
if element != nil {
c.deleteInternal(element)
Expand All @@ -242,28 +251,48 @@ func (c *lru) Size() int {
c.mut.Lock()
defer c.mut.Unlock()

c.evictExpiredItems()

return len(c.byKey)
}

// evictExpiredItems evicts all items in the cache which are expired
func (c *lru) evictExpiredItems() {
if !c.activelyEvict {
return // do nothing if activelyEvict is not set
}

now := c.now()
for elt := c.byAccess.Back(); elt != nil; elt = c.byAccess.Back() {
if !c.isEntryExpired(elt.Value.(*entryImpl), now) {
// List is sorted by item age, so we can stop as soon as we found first non expired item.
break
}
c.deleteInternal(elt)
}
}

// Put puts a new value associated with a given key, returning the existing value (if present)
// allowUpdate flag is used to control overwrite behavior if the value exists
func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) (interface{}, error) {
valueSize := c.sizeFunc(value)
c.mut.Lock()
defer c.mut.Unlock()

c.evictExpiredItems()

elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
if c.isEntryExpired(entry, time.Now()) {
if c.isEntryExpired(entry, c.now()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now()
entry.createTime = c.now()
}
}

Expand All @@ -285,7 +314,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
}

if c.ttl != 0 {
entry.createTime = time.Now()
entry.createTime = c.now()
}

c.byKey[key] = c.byAccess.PushFront(entry)
Expand Down
40 changes: 40 additions & 0 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type keyType struct {
Expand Down Expand Up @@ -384,3 +385,42 @@ func TestPanicOptionsIsNil(t *testing.T) {

New(nil)
}

func TestEvictItemsPastTimeToLive_ActivelyEvict(t *testing.T) {
// Create the cache with a TTL of 75s
cache, ok := New(&Options{
MaxCount: 5,
TTL: time.Second * 75,
ActivelyEvict: true,
}).(*lru)
require.True(t, ok)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

_, err := cache.PutIfNotExist("A", 1)
require.NoError(t, err)
_, err = cache.PutIfNotExist("B", 2)
require.NoError(t, err)

// Nothing is expired after 50s
currentTime = currentTime.Add(time.Second * 50)
assert.Equal(t, 2, cache.Size())

_, err = cache.PutIfNotExist("C", 3)
require.NoError(t, err)
_, err = cache.PutIfNotExist("D", 4)
require.NoError(t, err)

// No time has passed, so still nothing is expired
assert.Equal(t, 4, cache.Size())

// Advance time to 100s, so A and B should be expired
currentTime = currentTime.Add(time.Second * 50)
assert.Equal(t, 2, cache.Size())

// Advance time to 150s, so C and D should be expired as well
currentTime = currentTime.Add(time.Second * 50)
assert.Equal(t, 0, cache.Size())
}

0 comments on commit 42bdd9e

Please sign in to comment.