diff --git a/README.md b/README.md index 944ff3f..5b1174b 100644 --- a/README.md +++ b/README.md @@ -166,26 +166,14 @@ import ( func main() { cache := libcache.LRU.New(10) - cache.RegisterOnEvicted(func(key, value interface{}) { - fmt.Printf("Cache Key %v Evicted\n", key) - }) - - cache.RegisterOnExpired(func(key, value interface{}) { - fmt.Printf("Cache Key %v Expired, Removing it from cache\n", key) - // use delete directly when your application - // guarantee no other goroutine can store items with the same key. - // Peek also invoke lazy expiry. - // - // Note this should done only with safe cache. - cache.Peek(key) - }) - - for i:= 0 ; i < 10 ; i++ { - cache.StoreWithTTL(i, i, time.Microsecond) + fn := func(e libcache.Event) { + fmt.Printf("Operation %s on Key %v \n", e.Op, e.Key) } - - time.Sleep(time.Second) - fmt.Println(cache.Len()) + cache.Notify(fn, libcache.Read, libcache.Write, libcache.Remove) + cache.Load(1) + cache.Store(1) + cache.Peek(1) + cache.Delete(1) } ``` diff --git a/arc/arc.go b/arc/arc.go index 53bf3a5..2642b5f 100644 --- a/arc/arc.go +++ b/arc/arc.go @@ -179,6 +179,11 @@ func (a *arc) RegisterOnExpired(f func(key, value interface{})) { a.t2.RegisterOnExpired(f) } +func (a *arc) Notify(fn func(libcache.Event), ops ...libcache.Op) { + a.t1.Notify(fn, ops...) + a.t2.Notify(fn, ops...) +} + func min(x, y int) int { if x < y { return x diff --git a/cache.go b/cache.go index 58cd7b4..192c300 100644 --- a/cache.go +++ b/cache.go @@ -4,8 +4,23 @@ package libcache import ( "sync" "time" + + "github.com/shaj13/libcache/internal" +) + +// These are the generalized cache operations that can trigger a event. +const ( + Read = internal.Read + Write = internal.Write + Remove = internal.Remove ) +// Op describes a set of cache operations. +type Op = internal.Op + +// Event represents a single cache entry change. +type Event = internal.Event + // Cache stores data so that future requests for that data can be served faster. type Cache interface { // Load returns key value. @@ -39,24 +54,24 @@ type Cache interface { // SetTTL sets entries default TTL. SetTTL(time.Duration) // RegisterOnEvicted registers a function, - // to call in its own goroutine when an entry is purged from the cache. + // to call it when an entry is purged from the cache. + // + // Deprecated: use Notify instead. RegisterOnEvicted(f func(key, value interface{})) // RegisterOnExpired registers a function, - // to call in its own goroutine when an entry TTL elapsed. - // invocation of f, does not mean the entry is purged from the cache, - // if need be, it must coordinate with the cache explicitly. + // to call it when an entry TTL elapsed. // - // var cache cache.Cache - // onExpired := func(key, value interface{}) { - // _, _, _ = cache.Peek(key) - // } - // - // This should not be done unless the cache thread-safe. + // Deprecated: use Notify instead. RegisterOnExpired(f func(key, value interface{})) + + // Notify causes cahce to relay events to fn. + // If no operations are provided, all incoming operations will be relayed to fn. + // Otherwise, just the provided operations will. + Notify(fn func(Event), ops ...Op) } type cache struct { - mu sync.RWMutex + mu sync.Mutex unsafe Cache } @@ -97,8 +112,8 @@ func (c *cache) Delete(key interface{}) { } func (c *cache) Keys() []interface{} { - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() return c.unsafe.Keys() } @@ -121,20 +136,20 @@ func (c *cache) Resize(s int) int { } func (c *cache) Len() int { - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() return c.unsafe.Len() } func (c *cache) Cap() int { - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() return c.unsafe.Cap() } func (c *cache) TTL() time.Duration { - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() return c.unsafe.TTL() } @@ -156,8 +171,14 @@ func (c *cache) RegisterOnExpired(f func(key, value interface{})) { c.unsafe.RegisterOnExpired(f) } +func (c *cache) Notify(fn func(Event), ops ...Op) { + c.mu.Lock() + defer c.mu.Unlock() + c.unsafe.Notify(fn, ops...) +} + func (c *cache) Expiry(key interface{}) (time.Time, bool) { - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() return c.unsafe.Expiry(key) } diff --git a/cache_test.go b/cache_test.go index 6b6d338..dee6fbd 100644 --- a/cache_test.go +++ b/cache_test.go @@ -249,34 +249,18 @@ func TestOnEvicted(t *testing.T) { func TestOnExpired(t *testing.T) { for _, tt := range cacheTests { t.Run("Test"+tt.cont.String()+"CacheOnExpired", func(t *testing.T) { - send := make(chan interface{}) - done := make(chan bool) expiredKeys := make([]interface{}, 0, 2) cache := tt.cont.New(0) cache.RegisterOnExpired(func(key, _ interface{}) { - send <- key + expiredKeys = append(expiredKeys, key) }) cache.SetTTL(time.Millisecond) - go func() { - for { - key := <-send - expiredKeys = append(expiredKeys, key) - if len(expiredKeys) >= 2 { - done <- true - return - } - } - }() - cache.Store(1, 1234) cache.Store(2, 1234) - select { - case <-done: - case <-time.After(time.Second * 2): - t.Fatal("TestOnExpired timeout exceeded, expected to receive expired keys") - } + time.Sleep(time.Millisecond * 2) + cache.Peek(1) assert.ElementsMatch(t, []interface{}{1, 2}, expiredKeys) }) @@ -350,3 +334,30 @@ func BenchmarkCache(b *testing.B) { }) } } + +func TestNotify(t *testing.T) { + for _, tt := range cacheTests { + t.Run("Test"+tt.cont.String()+"CacheNotify", func(t *testing.T) { + got := 0 + cache := tt.cont.New(0) + fn := func(e libcache.Event) { + t.Logf("Operation %s on Key %v \n", e.Op, e.Key) + got += e.Key.(int) + } + + cache.Notify(fn, libcache.Read, libcache.Write, libcache.Remove) + + cache.Load(1) + cache.StoreWithTTL(1, 0, time.Second) + cache.Peek(1) + cache.Delete(1) + + if tt.cont == libcache.ARC { + assert.Equal(t, 7, got) + } else { + assert.Equal(t, 4, got) + } + }) + } + +} diff --git a/idle/idle.go b/idle/idle.go index cde53f5..c0feadc 100644 --- a/idle/idle.go +++ b/idle/idle.go @@ -35,3 +35,4 @@ func (idle) Purge() {} func (idle) SetTTL(ttl time.Duration) {} func (idle) RegisterOnExpired(f func(key, value interface{})) {} func (idle) RegisterOnEvicted(f func(key, value interface{})) {} +func (idle) Notify(fn func(libcache.Event), op ...libcache.Op) {} diff --git a/internal/cache.go b/internal/cache.go index c7c8ca3..b957f9a 100644 --- a/internal/cache.go +++ b/internal/cache.go @@ -2,9 +2,35 @@ package internal import ( "container/heap" + "fmt" + "strconv" "time" ) +// Op describes a set of cache operations. +type Op int + +// These are the generalized cache operations that can trigger a event. +const ( + Read Op = iota + Write + Remove + maxOp +) + +func (op Op) String() string { + switch op { + case Read: + return "READ" + case Write: + return "WRITE" + case Remove: + return "REMOVE" + default: + return "UNKNOWN" + } +} + // Collection represents the cache underlying data structure, // and defines the functions or operations that can be applied to the data elements. type Collection interface { @@ -16,48 +42,44 @@ type Collection interface { Init() } +// Event represents a single cache entry change. +type Event struct { + // Op represents cache operation that triggered the event. + Op Op + // Key represents cache entry key. + Key interface{} + // Value represents cache key value. + Value interface{} + // Expiry represents cache key value expiry time. + Expiry time.Time + // Ok report whether the read operation succeed. + Ok bool +} + +// String returns a string representation of the event in the form +// "file: REMOVE|WRITE|..." +func (e Event) String() string { + return fmt.Sprintf("%v: %s", e.Key, e.Op.String()) +} + // Entry is used to hold a value in the cache. type Entry struct { Key interface{} Value interface{} Element interface{} Exp time.Time - timer *time.Timer index int - cancel chan struct{} -} - -// start/stop timer added for safety to prevent fire on expired callback, -// when entry re-stored at the expiry time. -func (e *Entry) startTimer(d time.Duration, f func(key, value interface{})) { - e.cancel = make(chan struct{}) - e.timer = time.AfterFunc(d, func() { - select { - case <-e.cancel: - default: - f(e.Key, e.Value) - } - }) -} - -func (e *Entry) stopTimer() { - if e.timer == nil { - return - } - e.timer.Stop() - close(e.cancel) } // Cache is an abstracted cache that provides a skeletal implementation, // of the Cache interface to minimize the effort required to implement interface. type Cache struct { - coll Collection - heap expiringHeap - entries map[interface{}]*Entry - onEvicted func(key, value interface{}) - onExpired func(key, value interface{}) - ttl time.Duration - capacity int + coll Collection + heap expiringHeap + entries map[interface{}]*Entry + events [maxOp][]func(Event) + ttl time.Duration + capacity int } // Load returns key value. @@ -70,19 +92,21 @@ func (c *Cache) Peek(key interface{}) (interface{}, bool) { return c.get(key, true) } -func (c *Cache) get(key interface{}, peek bool) (v interface{}, found bool) { +func (c *Cache) get(key interface{}, peek bool) (interface{}, bool) { // Run GC inline before return the entry. c.gc() e, ok := c.entries[key] if !ok { - return + c.emit(Read, key, nil, time.Time{}, ok) + return nil, ok } if !peek { c.coll.Move(e) } + c.emit(Read, key, e.Value, e.Exp, ok) return e.Value, ok } @@ -112,9 +136,6 @@ func (c *Cache) StoreWithTTL(key, value interface{}, ttl time.Duration) { e := &Entry{Key: key, Value: value} if ttl > 0 { - if c.onExpired != nil { - e.startTimer(ttl, c.onExpired) - } e.Exp = time.Now().UTC().Add(ttl) heap.Push(&c.heap, e) } @@ -123,13 +144,20 @@ func (c *Cache) StoreWithTTL(key, value interface{}, ttl time.Duration) { if c.capacity != 0 && c.Len() >= c.capacity { c.Discard() } + c.coll.Add(e) + c.emit(Write, e.Key, e.Value, e.Exp, false) } // Update the key value without updating the underlying "rank". func (c *Cache) Update(key, value interface{}) { + // Run GC inline before update the entry. + c.gc() + if c.Contains(key) { - c.entries[key].Value = value + e := c.entries[key] + e.Value = value + c.emit(Write, e.Key, e.Value, e.Exp, false) } } @@ -137,7 +165,7 @@ func (c *Cache) Update(key, value interface{}) { func (c *Cache) Purge() { defer c.coll.Init() - if c.onEvicted == nil { + if len(c.events[Remove]) == 0 { c.entries = make(map[interface{}]*Entry) return } @@ -208,7 +236,6 @@ func (c *Cache) Discard() (key, value interface{}) { func (c *Cache) removeEntry(e *Entry) { c.coll.Remove(e) - e.stopTimer() delete(c.entries, e.Key) // Remove entry from the heap, the entry may does not exist because // it has zero ttl or already popped up by gc @@ -220,8 +247,20 @@ func (c *Cache) removeEntry(e *Entry) { // evict remove entry and fire on evicted callback. func (c *Cache) evict(e *Entry) { c.removeEntry(e) - if c.onEvicted != nil { - go c.onEvicted(e.Key, e.Value) + c.emit(Remove, e.Key, e.Value, e.Exp, false) +} + +func (c *Cache) emit(op Op, k, v interface{}, exp time.Time, ok bool) { + e := Event{ + Op: op, + Key: k, + Value: v, + Expiry: exp, + Ok: ok, + } + + for _, fn := range c.events[op] { + fn(e) } } @@ -234,7 +273,7 @@ func (c *Cache) gc() { return } e := heap.Pop(&c.heap).(*Entry) - c.removeEntry(e) + c.evict(e) } } @@ -253,16 +292,38 @@ func (c *Cache) Cap() int { return c.capacity } +// Notify causes cahce to relay events to fn. +// If no operations are provided, all incoming operations will be relayed to fn. +// Otherwise, just the provided operations will. +func (c *Cache) Notify(fn func(Event), ops ...Op) { + if len(ops) == 0 { + ops = append(ops, Read, Write, Remove) + } + + for _, op := range ops { + if op >= maxOp { + panic("libcache: notify on unknown operation #" + strconv.Itoa(int(op))) + } + c.events[op] = append(c.events[op], fn) + } +} + // RegisterOnEvicted registers a function, -// to call in its own goroutine when an entry is purged from the cache. -func (c *Cache) RegisterOnEvicted(f func(key, value interface{})) { - c.onEvicted = f +// to call it when an entry is purged from the cache. +func (c *Cache) RegisterOnEvicted(fn func(key, value interface{})) { + c.Notify(func(e Event) { + fn(e.Key, e.Value) + }, Remove) } // RegisterOnExpired registers a function, -// to call in its own goroutine when an entry TTL elapsed. -func (c *Cache) RegisterOnExpired(f func(key, value interface{})) { - c.onExpired = f +// to call it when an entry TTL elapsed. +func (c *Cache) RegisterOnExpired(fn func(key, value interface{})) { + c.Notify(func(e Event) { + if e.Expiry.Before(time.Now()) { + fn(e.Key, e.Value) + } + }, Remove) } // New return new abstracted cache. diff --git a/policy.go b/policy.go index eac08eb..0cce89a 100644 --- a/policy.go +++ b/policy.go @@ -49,7 +49,7 @@ func (c ReplacementPolicy) Available() bool { // New panics if the cache replacement policy function is not linked into the binary. func (c ReplacementPolicy) New(cap int) Cache { cache := new(cache) - cache.mu = sync.RWMutex{} + cache.mu = sync.Mutex{} cache.unsafe = c.NewUnsafe(cap) return cache }