diff --git a/common/cache/cache.go b/common/cache/cache.go index e4ac226619f..8cb26f5e09b 100644 --- a/common/cache/cache.go +++ b/common/cache/cache.go @@ -84,6 +84,14 @@ type Options struct { // to control the max size in bytes of the cache // It is required option if MaxCount is not provided MaxSize uint64 + + // ActivelyEvict will evict items that has expired TTL at every operation in the cache + // This can be expensive if a lot of items expire at the same time + // Should be used when it's important for memory that the expired items are evicted as soon as possible + // If not set expired items will be evicted when one of these happens + // - when the cache is full + // - when the item is accessed + ActivelyEvict bool } // SimpleOptions provides options that can be used to configure SimpleCache diff --git a/common/cache/lru.go b/common/cache/lru.go index f3d9742f594..ee00bc59ae6 100644 --- a/common/cache/lru.go +++ b/common/cache/lru.go @@ -38,18 +38,21 @@ const cacheCountLimit = 1 << 25 // lru is a concurrent fixed size cache that evicts elements in lru order type ( lru struct { - mut sync.Mutex - byAccess *list.List - byKey map[interface{}]*list.Element - maxCount int - ttl time.Duration - pin bool - rmFunc RemovedFunc - sizeFunc GetCacheItemSizeFunc - maxSize uint64 - currSize uint64 - sizeByKey map[interface{}]uint64 - isSizeBased bool + mut sync.Mutex + byAccess *list.List + byKey map[interface{}]*list.Element + maxCount int + ttl time.Duration + pin bool + rmFunc RemovedFunc + sizeFunc GetCacheItemSizeFunc + maxSize uint64 + currSize uint64 + sizeByKey map[interface{}]uint64 + isSizeBased bool + activelyEvict bool + // We use this instead of time.Now() in order to make testing easier + now func() time.Time } iteratorImpl struct { @@ -114,7 +117,7 @@ func (c *lru) Iterator() Iterator { c.mut.Lock() iterator := &iteratorImpl{ lru: c, - createTime: time.Now(), + createTime: c.now(), nextItem: c.byAccess.Front(), } iterator.prepareNext() @@ -141,11 +144,13 @@ func New(opts *Options) Cache { } cache := &lru{ - byAccess: list.New(), - byKey: make(map[interface{}]*list.Element, opts.InitialCapacity), - ttl: opts.TTL, - pin: opts.Pin, - rmFunc: opts.RemovedFunc, + byAccess: list.New(), + byKey: make(map[interface{}]*list.Element, opts.InitialCapacity), + ttl: opts.TTL, + pin: opts.Pin, + rmFunc: opts.RemovedFunc, + activelyEvict: opts.ActivelyEvict, + now: time.Now, } cache.isSizeBased = opts.GetCacheItemSizeFunc != nil && opts.MaxSize > 0 @@ -169,6 +174,8 @@ func (c *lru) Get(key interface{}) interface{} { c.mut.Lock() defer c.mut.Unlock() + c.evictExpiredItems() + element := c.byKey[key] if element == nil { return nil @@ -176,7 +183,7 @@ func (c *lru) Get(key interface{}) interface{} { entry := element.Value.(*entryImpl) - if c.isEntryExpired(entry, time.Now()) { + if c.isEntryExpired(entry, c.now()) { // Entry has expired c.deleteInternal(element) return nil @@ -218,6 +225,8 @@ func (c *lru) Delete(key interface{}) { c.mut.Lock() defer c.mut.Unlock() + c.evictExpiredItems() + element := c.byKey[key] if element != nil { c.deleteInternal(element) @@ -242,9 +251,27 @@ func (c *lru) Size() int { c.mut.Lock() defer c.mut.Unlock() + c.evictExpiredItems() + return len(c.byKey) } +// evictExpiredItems evicts all items in the cache which are expired +func (c *lru) evictExpiredItems() { + if !c.activelyEvict { + return // do nothing if activelyEvict is not set + } + + now := c.now() + for elt := c.byAccess.Back(); elt != nil; elt = c.byAccess.Back() { + if !c.isEntryExpired(elt.Value.(*entryImpl), now) { + // List is sorted by item age, so we can stop as soon as we found first non expired item. + break + } + c.deleteInternal(elt) + } +} + // Put puts a new value associated with a given key, returning the existing value (if present) // allowUpdate flag is used to control overwrite behavior if the value exists func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) (interface{}, error) { @@ -252,10 +279,12 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) c.mut.Lock() defer c.mut.Unlock() + c.evictExpiredItems() + elt := c.byKey[key] if elt != nil { entry := elt.Value.(*entryImpl) - if c.isEntryExpired(entry, time.Now()) { + if c.isEntryExpired(entry, c.now()) { // Entry has expired c.deleteInternal(elt) } else { @@ -263,7 +292,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) if allowUpdate { entry.value = value if c.ttl != 0 { - entry.createTime = time.Now() + entry.createTime = c.now() } } @@ -285,7 +314,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) } if c.ttl != 0 { - entry.createTime = time.Now() + entry.createTime = c.now() } c.byKey[key] = c.byAccess.PushFront(entry) diff --git a/common/cache/lru_test.go b/common/cache/lru_test.go index 3ba0d1508c2..1d70c55ce16 100644 --- a/common/cache/lru_test.go +++ b/common/cache/lru_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type keyType struct { @@ -384,3 +385,42 @@ func TestPanicOptionsIsNil(t *testing.T) { New(nil) } + +func TestEvictItemsPastTimeToLive_ActivelyEvict(t *testing.T) { + // Create the cache with a TTL of 75s + cache, ok := New(&Options{ + MaxCount: 5, + TTL: time.Second * 75, + ActivelyEvict: true, + }).(*lru) + require.True(t, ok) + + // We will capture this in the caches now function, and advance time as needed + currentTime := time.UnixMilli(0) + cache.now = func() time.Time { return currentTime } + + _, err := cache.PutIfNotExist("A", 1) + require.NoError(t, err) + _, err = cache.PutIfNotExist("B", 2) + require.NoError(t, err) + + // Nothing is expired after 50s + currentTime = currentTime.Add(time.Second * 50) + assert.Equal(t, 2, cache.Size()) + + _, err = cache.PutIfNotExist("C", 3) + require.NoError(t, err) + _, err = cache.PutIfNotExist("D", 4) + require.NoError(t, err) + + // No time has passed, so still nothing is expired + assert.Equal(t, 4, cache.Size()) + + // Advance time to 100s, so A and B should be expired + currentTime = currentTime.Add(time.Second * 50) + assert.Equal(t, 2, cache.Size()) + + // Advance time to 150s, so C and D should be expired as well + currentTime = currentTime.Add(time.Second * 50) + assert.Equal(t, 0, cache.Size()) +}