From 0d3b0d0797e39dee22341d848aa16db4853898db Mon Sep 17 00:00:00 2001 From: shaj13 Date: Mon, 27 Jun 2022 19:25:01 +0300 Subject: [PATCH] chore: refactor and implement new event handlers --- arc/arc.go | 11 ++++-- cache.go | 22 ++++++++---- cache_test.go | 51 ++++++++++----------------- idle/idle.go | 3 +- internal/cache.go | 88 ++++++++++++++++++++++++++++++++++------------- 5 files changed, 110 insertions(+), 65 deletions(-) diff --git a/arc/arc.go b/arc/arc.go index 3a17fa7..1e13589 100644 --- a/arc/arc.go +++ b/arc/arc.go @@ -179,9 +179,14 @@ func (a *arc) RegisterOnExpired(f func(key, value interface{})) { a.t2.RegisterOnExpired(f) } -func (a *arc) Notify(fn func(libcache.Event), ops ...libcache.Op) { - a.t1.Notify(fn, ops...) - a.t2.Notify(fn, ops...) +func (a *arc) Notify(ch chan<- libcache.Event, ops ...libcache.Op) { + a.t1.Notify(ch, ops...) + a.t2.Notify(ch, ops...) +} + +func (a *arc) Ignore(ch chan<- libcache.Event, ops ...libcache.Op) { + a.t1.Ignore(ch, ops...) + a.t2.Ignore(ch, ops...) } func (a *arc) GC() time.Duration { diff --git a/cache.go b/cache.go index 1508d08..fd0c031 100644 --- a/cache.go +++ b/cache.go @@ -63,10 +63,14 @@ type Cache interface { // // Deprecated: use Notify instead. RegisterOnExpired(f func(key, value interface{})) - // Notify causes cahce to relay events to fn. - // If no operations are provided, all incoming operations will be relayed to fn. + // Notify causes cache to relay events to ch. + // If no operations are provided, all incoming operations will be relayed to ch. // Otherwise, just the provided operations will. - Notify(fn func(Event), ops ...Op) + Notify(ch chan<- Event, ops ...Op) + // Ignore causes the provided operations to be ignored. Ignore undoes the effect + // of any prior calls to Notify for the provided operations. + // If no operations are provided, ch removed. + Ignore(ch chan<- Event, ops ...Op) // GC runs a garbage collection and blocks the caller until the // all expired items from the cache evicted. // @@ -101,8 +105,8 @@ func (c *cache) Peek(key interface{}) (interface{}, bool) { func (c *cache) Update(key interface{}, value interface{}) { c.mu.Lock() - defer c.mu.Unlock() c.unsafe.Update(key, value) + c.mu.Unlock() } func (c *cache) Store(key interface{}, value interface{}) { @@ -189,9 +193,15 @@ func (c *cache) RegisterOnExpired(f func(key, value interface{})) { c.mu.Unlock() } -func (c *cache) Notify(fn func(Event), ops ...Op) { +func (c *cache) Notify(ch chan<- Event, ops ...Op) { + c.mu.Lock() + c.unsafe.Notify(ch, ops...) + c.mu.Unlock() +} + +func (c *cache) Ignore(ch chan<- Event, ops ...Op) { c.mu.Lock() - c.unsafe.Notify(fn, ops...) + c.unsafe.Ignore(ch, ops...) c.mu.Unlock() } diff --git a/cache_test.go b/cache_test.go index 39f101b..b95a7b9 100644 --- a/cache_test.go +++ b/cache_test.go @@ -213,17 +213,15 @@ func TestOnEvicted(t *testing.T) { for _, tt := range cacheTests { t.Run("Test"+tt.cont.String()+"CacheOnEvicted", func(t *testing.T) { cache := tt.cont.New(20) - send := make(chan interface{}) + send := make(chan libcache.Event, 10) done := make(chan bool) evictedKeys := make([]interface{}, 0, 2) - cache.RegisterOnEvicted(func(key, value interface{}) { - send <- key - }) + cache.Notify(send, libcache.Remove) go func() { for { - key := <-send - evictedKeys = append(evictedKeys, key) + e := <-send + evictedKeys = append(evictedKeys, e.Key) if len(evictedKeys) >= 2 { done <- true return @@ -246,27 +244,6 @@ func TestOnEvicted(t *testing.T) { } } -func TestOnExpired(t *testing.T) { - for _, tt := range cacheTests { - t.Run("Test"+tt.cont.String()+"CacheOnExpired", func(t *testing.T) { - expiredKeys := make([]interface{}, 0, 2) - cache := tt.cont.New(0) - cache.RegisterOnExpired(func(key, _ interface{}) { - expiredKeys = append(expiredKeys, key) - }) - cache.SetTTL(time.Millisecond) - - cache.Store(1, 1234) - cache.Store(2, 1234) - - time.Sleep(time.Millisecond * 2) - cache.Peek(1) - - assert.ElementsMatch(t, []interface{}{1, 2}, expiredKeys) - }) - } -} - func TestExpiring(t *testing.T) { for _, tt := range cacheTests { t.Run("Test"+tt.cont.String()+"CacheExpiring", func(t *testing.T) { @@ -313,24 +290,33 @@ func TestNotify(t *testing.T) { for _, tt := range cacheTests { t.Run("Test"+tt.cont.String()+"CacheNotify", func(t *testing.T) { got := 0 + c := make(chan libcache.Event, 10) cache := tt.cont.New(0) - fn := func(e libcache.Event) { - t.Logf("Operation %s on Key %v \n", e.Op, e.Key) - got += e.Key.(int) - } - cache.Notify(fn, libcache.Read, libcache.Write, libcache.Remove) + cache.Notify(c) cache.Load(1) cache.StoreWithTTL(1, 0, time.Second) cache.Peek(1) cache.Delete(1) + close(c) + + for e := range c { + t.Logf("Operation %s on Key %v \n", e.Op, e.Key) + got += e.Key.(int) + } if tt.cont == libcache.ARC { assert.Equal(t, 7, got) } else { assert.Equal(t, 4, got) } + + // check it will not try to write on chan after ignore + cache.Ignore(c) + for i := 0; i < 10; i++ { + cache.Store(i, i) + } }) } } @@ -351,6 +337,7 @@ func TestGC(t *testing.T) { }) } } + func BenchmarkCache(b *testing.B) { for _, tt := range cacheTests { b.Run("Benchmark"+tt.cont.String()+"Cache", func(b *testing.B) { diff --git a/idle/idle.go b/idle/idle.go index 6442c24..9dad64f 100644 --- a/idle/idle.go +++ b/idle/idle.go @@ -36,4 +36,5 @@ func (idle) Purge() {} func (idle) SetTTL(ttl time.Duration) {} func (idle) RegisterOnExpired(f func(key, value interface{})) {} func (idle) RegisterOnEvicted(f func(key, value interface{})) {} -func (idle) Notify(fn func(libcache.Event), op ...libcache.Op) {} +func (idle) Notify(ch chan<- libcache.Event, ops ...libcache.Op) {} +func (idle) Ignore(ch chan<- libcache.Event, ops ...libcache.Op) {} diff --git a/internal/cache.go b/internal/cache.go index 2f24f74..f5d36e2 100644 --- a/internal/cache.go +++ b/internal/cache.go @@ -3,16 +3,15 @@ package internal import ( "container/heap" "fmt" - "strconv" "time" ) // Op describes a set of cache operations. -type Op int +type Op uint8 // These are the generalized cache operations that can trigger a event. const ( - Read Op = iota + Read Op = iota + 1 Write Remove maxOp @@ -31,6 +30,22 @@ func (op Op) String() string { } } +type handler struct { + mask [((maxOp - 1) + 7) / 8]uint8 +} + +func (h *handler) want(op Op) bool { + return (h.mask[op/8]>>uint8(op&7))&1 != 0 +} + +func (h *handler) set(op Op) { + h.mask[op/8] |= 1 << uint8(op&7) +} + +func (h *handler) clear(op Op) { + h.mask[op/8] &^= 1 << uint8(op&7) +} + // Collection represents the cache underlying data structure, // and defines the functions or operations that can be applied to the data elements. type Collection interface { @@ -77,7 +92,7 @@ type Cache struct { coll Collection heap expiringHeap entries map[interface{}]*Entry - events [maxOp][]func(Event) + handlers map[chan<- Event]*handler ttl time.Duration capacity int } @@ -165,7 +180,7 @@ func (c *Cache) Update(key, value interface{}) { func (c *Cache) Purge() { defer c.coll.Init() - if len(c.events[Remove]) == 0 { + if len(c.handlers) == 0 { c.entries = make(map[interface{}]*Entry) c.heap = nil return @@ -260,8 +275,14 @@ func (c *Cache) emit(op Op, k, v interface{}, exp time.Time, ok bool) { Ok: ok, } - for _, fn := range c.events[op] { - fn(e) + for c, h := range c.handlers { + if h.want(op) { + // send but do not block for it + select { + case c <- e: + default: + } + } } } @@ -304,38 +325,58 @@ func (c *Cache) Cap() int { return c.capacity } -// Notify causes cahce to relay events to fn. -// If no operations are provided, all incoming operations will be relayed to fn. +// Notify causes cache to relay events to ch. +// If no operations are provided, all incoming operations will be relayed to ch. // Otherwise, just the provided operations will. -func (c *Cache) Notify(fn func(Event), ops ...Op) { +func (c *Cache) Notify(ch chan<- Event, ops ...Op) { + if ch == nil { + panic("libcache: Notify using nil channel") + } + + h := new(handler) + c.handlers[ch] = h + + if len(ops) == 0 { + for i := 1; i <= int(maxOp); i++ { + h.set(Op(i)) + } + return + } + + for _, op := range ops { + h.set(op) + } +} + +// Ignore causes the provided ops to be ignored. Ignore undoes the effect +// of any prior calls to Notify for the provided ops. +// If no ops are provided, ch removed. +func (c *Cache) Ignore(ch chan<- Event, ops ...Op) { if len(ops) == 0 { - ops = append(ops, Read, Write, Remove) + delete(c.handlers, ch) + return + } + + h, ok := c.handlers[ch] + if !ok { + return } for _, op := range ops { - if op < 0 && op >= maxOp { - panic("libcache: notify on unknown operation #" + strconv.Itoa(int(op))) - } - c.events[op] = append(c.events[op], fn) + h.clear(op) } } // RegisterOnEvicted registers a function, // to call it when an entry is purged from the cache. func (c *Cache) RegisterOnEvicted(fn func(key, value interface{})) { - c.Notify(func(e Event) { - fn(e.Key, e.Value) - }, Remove) + panic("RegisterOnEvicted no longer available") } // RegisterOnExpired registers a function, // to call it when an entry TTL elapsed. func (c *Cache) RegisterOnExpired(fn func(key, value interface{})) { - c.Notify(func(e Event) { - if e.Expiry.Before(time.Now()) { - fn(e.Key, e.Value) - } - }, Remove) + panic("RegisterOnExpired no longer available") } // New return new abstracted cache. @@ -344,6 +385,7 @@ func New(c Collection, cap int) *Cache { coll: c, capacity: cap, entries: make(map[interface{}]*Entry), + handlers: make(map[chan<- Event]*handler), } }