Skip to content

Commit

Permalink
chore: refactor and implement new event handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
shaj13 committed Jun 27, 2022
1 parent b0fa077 commit 0d3b0d0
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 65 deletions.
11 changes: 8 additions & 3 deletions arc/arc.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,14 @@ func (a *arc) RegisterOnExpired(f func(key, value interface{})) {
a.t2.RegisterOnExpired(f)
}

func (a *arc) Notify(fn func(libcache.Event), ops ...libcache.Op) {
a.t1.Notify(fn, ops...)
a.t2.Notify(fn, ops...)
func (a *arc) Notify(ch chan<- libcache.Event, ops ...libcache.Op) {
a.t1.Notify(ch, ops...)
a.t2.Notify(ch, ops...)
}

func (a *arc) Ignore(ch chan<- libcache.Event, ops ...libcache.Op) {
a.t1.Ignore(ch, ops...)
a.t2.Ignore(ch, ops...)
}

func (a *arc) GC() time.Duration {
Expand Down
22 changes: 16 additions & 6 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,14 @@ type Cache interface {
//
// Deprecated: use Notify instead.
RegisterOnExpired(f func(key, value interface{}))
// Notify causes cahce to relay events to fn.
// If no operations are provided, all incoming operations will be relayed to fn.
// Notify causes cache to relay events to ch.
// If no operations are provided, all incoming operations will be relayed to ch.
// Otherwise, just the provided operations will.
Notify(fn func(Event), ops ...Op)
Notify(ch chan<- Event, ops ...Op)
// Ignore causes the provided operations to be ignored. Ignore undoes the effect
// of any prior calls to Notify for the provided operations.
// If no operations are provided, ch removed.
Ignore(ch chan<- Event, ops ...Op)
// GC runs a garbage collection and blocks the caller until the
// all expired items from the cache evicted.
//
Expand Down Expand Up @@ -101,8 +105,8 @@ func (c *cache) Peek(key interface{}) (interface{}, bool) {

func (c *cache) Update(key interface{}, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.unsafe.Update(key, value)
c.mu.Unlock()
}

func (c *cache) Store(key interface{}, value interface{}) {
Expand Down Expand Up @@ -189,9 +193,15 @@ func (c *cache) RegisterOnExpired(f func(key, value interface{})) {
c.mu.Unlock()
}

func (c *cache) Notify(fn func(Event), ops ...Op) {
func (c *cache) Notify(ch chan<- Event, ops ...Op) {
c.mu.Lock()
c.unsafe.Notify(ch, ops...)
c.mu.Unlock()
}

func (c *cache) Ignore(ch chan<- Event, ops ...Op) {
c.mu.Lock()
c.unsafe.Notify(fn, ops...)
c.unsafe.Ignore(ch, ops...)
c.mu.Unlock()
}

Expand Down
51 changes: 19 additions & 32 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,15 @@ func TestOnEvicted(t *testing.T) {
for _, tt := range cacheTests {
t.Run("Test"+tt.cont.String()+"CacheOnEvicted", func(t *testing.T) {
cache := tt.cont.New(20)
send := make(chan interface{})
send := make(chan libcache.Event, 10)
done := make(chan bool)
evictedKeys := make([]interface{}, 0, 2)
cache.RegisterOnEvicted(func(key, value interface{}) {
send <- key
})
cache.Notify(send, libcache.Remove)

go func() {
for {
key := <-send
evictedKeys = append(evictedKeys, key)
e := <-send
evictedKeys = append(evictedKeys, e.Key)
if len(evictedKeys) >= 2 {
done <- true
return
Expand All @@ -246,27 +244,6 @@ func TestOnEvicted(t *testing.T) {
}
}

func TestOnExpired(t *testing.T) {
for _, tt := range cacheTests {
t.Run("Test"+tt.cont.String()+"CacheOnExpired", func(t *testing.T) {
expiredKeys := make([]interface{}, 0, 2)
cache := tt.cont.New(0)
cache.RegisterOnExpired(func(key, _ interface{}) {
expiredKeys = append(expiredKeys, key)
})
cache.SetTTL(time.Millisecond)

cache.Store(1, 1234)
cache.Store(2, 1234)

time.Sleep(time.Millisecond * 2)
cache.Peek(1)

assert.ElementsMatch(t, []interface{}{1, 2}, expiredKeys)
})
}
}

func TestExpiring(t *testing.T) {
for _, tt := range cacheTests {
t.Run("Test"+tt.cont.String()+"CacheExpiring", func(t *testing.T) {
Expand Down Expand Up @@ -313,24 +290,33 @@ func TestNotify(t *testing.T) {
for _, tt := range cacheTests {
t.Run("Test"+tt.cont.String()+"CacheNotify", func(t *testing.T) {
got := 0
c := make(chan libcache.Event, 10)
cache := tt.cont.New(0)
fn := func(e libcache.Event) {
t.Logf("Operation %s on Key %v \n", e.Op, e.Key)
got += e.Key.(int)
}

cache.Notify(fn, libcache.Read, libcache.Write, libcache.Remove)
cache.Notify(c)

cache.Load(1)
cache.StoreWithTTL(1, 0, time.Second)
cache.Peek(1)
cache.Delete(1)
close(c)

for e := range c {
t.Logf("Operation %s on Key %v \n", e.Op, e.Key)
got += e.Key.(int)
}

if tt.cont == libcache.ARC {
assert.Equal(t, 7, got)
} else {
assert.Equal(t, 4, got)
}

// check it will not try to write on chan after ignore
cache.Ignore(c)
for i := 0; i < 10; i++ {
cache.Store(i, i)
}
})
}
}
Expand All @@ -351,6 +337,7 @@ func TestGC(t *testing.T) {
})
}
}

func BenchmarkCache(b *testing.B) {
for _, tt := range cacheTests {
b.Run("Benchmark"+tt.cont.String()+"Cache", func(b *testing.B) {
Expand Down
3 changes: 2 additions & 1 deletion idle/idle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ func (idle) Purge() {}
func (idle) SetTTL(ttl time.Duration) {}
func (idle) RegisterOnExpired(f func(key, value interface{})) {}
func (idle) RegisterOnEvicted(f func(key, value interface{})) {}
func (idle) Notify(fn func(libcache.Event), op ...libcache.Op) {}
func (idle) Notify(ch chan<- libcache.Event, ops ...libcache.Op) {}
func (idle) Ignore(ch chan<- libcache.Event, ops ...libcache.Op) {}
88 changes: 65 additions & 23 deletions internal/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package internal
import (
"container/heap"
"fmt"
"strconv"
"time"
)

// Op describes a set of cache operations.
type Op int
type Op uint8

// These are the generalized cache operations that can trigger a event.
const (
Read Op = iota
Read Op = iota + 1
Write
Remove
maxOp
Expand All @@ -31,6 +30,22 @@ func (op Op) String() string {
}
}

type handler struct {
mask [((maxOp - 1) + 7) / 8]uint8
}

func (h *handler) want(op Op) bool {
return (h.mask[op/8]>>uint8(op&7))&1 != 0
}

func (h *handler) set(op Op) {
h.mask[op/8] |= 1 << uint8(op&7)
}

func (h *handler) clear(op Op) {
h.mask[op/8] &^= 1 << uint8(op&7)
}

// Collection represents the cache underlying data structure,
// and defines the functions or operations that can be applied to the data elements.
type Collection interface {
Expand Down Expand Up @@ -77,7 +92,7 @@ type Cache struct {
coll Collection
heap expiringHeap
entries map[interface{}]*Entry
events [maxOp][]func(Event)
handlers map[chan<- Event]*handler
ttl time.Duration
capacity int
}
Expand Down Expand Up @@ -165,7 +180,7 @@ func (c *Cache) Update(key, value interface{}) {
func (c *Cache) Purge() {
defer c.coll.Init()

if len(c.events[Remove]) == 0 {
if len(c.handlers) == 0 {
c.entries = make(map[interface{}]*Entry)
c.heap = nil
return
Expand Down Expand Up @@ -260,8 +275,14 @@ func (c *Cache) emit(op Op, k, v interface{}, exp time.Time, ok bool) {
Ok: ok,
}

for _, fn := range c.events[op] {
fn(e)
for c, h := range c.handlers {
if h.want(op) {
// send but do not block for it
select {
case c <- e:
default:
}
}
}
}

Expand Down Expand Up @@ -304,38 +325,58 @@ func (c *Cache) Cap() int {
return c.capacity
}

// Notify causes cahce to relay events to fn.
// If no operations are provided, all incoming operations will be relayed to fn.
// Notify causes cache to relay events to ch.
// If no operations are provided, all incoming operations will be relayed to ch.
// Otherwise, just the provided operations will.
func (c *Cache) Notify(fn func(Event), ops ...Op) {
func (c *Cache) Notify(ch chan<- Event, ops ...Op) {
if ch == nil {
panic("libcache: Notify using nil channel")
}

h := new(handler)
c.handlers[ch] = h

if len(ops) == 0 {
for i := 1; i <= int(maxOp); i++ {
h.set(Op(i))
}
return
}

for _, op := range ops {
h.set(op)
}
}

// Ignore causes the provided ops to be ignored. Ignore undoes the effect
// of any prior calls to Notify for the provided ops.
// If no ops are provided, ch removed.
func (c *Cache) Ignore(ch chan<- Event, ops ...Op) {
if len(ops) == 0 {
ops = append(ops, Read, Write, Remove)
delete(c.handlers, ch)
return
}

h, ok := c.handlers[ch]
if !ok {
return
}

for _, op := range ops {
if op < 0 && op >= maxOp {
panic("libcache: notify on unknown operation #" + strconv.Itoa(int(op)))
}
c.events[op] = append(c.events[op], fn)
h.clear(op)
}
}

// RegisterOnEvicted registers a function,
// to call it when an entry is purged from the cache.
func (c *Cache) RegisterOnEvicted(fn func(key, value interface{})) {
c.Notify(func(e Event) {
fn(e.Key, e.Value)
}, Remove)
panic("RegisterOnEvicted no longer available")
}

// RegisterOnExpired registers a function,
// to call it when an entry TTL elapsed.
func (c *Cache) RegisterOnExpired(fn func(key, value interface{})) {
c.Notify(func(e Event) {
if e.Expiry.Before(time.Now()) {
fn(e.Key, e.Value)
}
}, Remove)
panic("RegisterOnExpired no longer available")
}

// New return new abstracted cache.
Expand All @@ -344,6 +385,7 @@ func New(c Collection, cap int) *Cache {
coll: c,
capacity: cap,
entries: make(map[interface{}]*Entry),
handlers: make(map[chan<- Event]*handler),
}
}

Expand Down

0 comments on commit 0d3b0d0

Please sign in to comment.