Skip to content

Commit

Permalink
[exp/singleflight] refactor ticker code
Browse files Browse the repository at this point in the history
Change-Id: I64bfadfdbad7179c8f3937a2b4a11bef7d923d37
  • Loading branch information
jxskiss committed Nov 30, 2023
1 parent 5700064 commit 0942ea0
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 37 deletions.
40 changes: 6 additions & 34 deletions exp/singleflight/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,8 @@ import (
"sync/atomic"
"time"
"unsafe"

"github.com/jxskiss/gopkg/v2/perf/mselect"
)

var (
mselOnce sync.Once
msel mselect.ManySelect
)

func initManySelect() {
mselOnce.Do(func() { msel = mselect.New() })
}

// ErrFetchTimeout indicates a timeout error when refresh a cached value
// if CacheOptions.FetchTimeout is specified.
var ErrFetchTimeout = errors.New("fetch timeout")
Expand Down Expand Up @@ -103,11 +92,8 @@ type Cache struct {
group Group
data sync.Map

refreshTicker *time.Ticker
refreshTask *mselect.Task

expireTicker *time.Ticker
expireTask *mselect.Task
refreshTicker callbackTicker
expireTicker callbackTicker

closed int32
}
Expand All @@ -119,20 +105,10 @@ func NewCache(opt CacheOptions) *Cache {
opt: opt,
}
if opt.RefreshInterval > 0 {
initManySelect()
ticker := time.NewTicker(opt.RefreshInterval)
task := mselect.NewTask(ticker.C, nil, c.doRefresh)
msel.Add(task)
c.refreshTicker = ticker
c.refreshTask = task
c.refreshTicker = newCallbackTicker(opt.RefreshInterval, c.doRefresh)
}
if opt.ExpireInterval > 0 {
initManySelect()
ticker := time.NewTicker(opt.ExpireInterval)
task := mselect.NewTask(ticker.C, nil, c.doExpire)
msel.Add(task)
c.expireTicker = ticker
c.expireTask = task
c.expireTicker = newCallbackTicker(opt.ExpireInterval, c.doExpire)
}

return c
Expand All @@ -149,15 +125,11 @@ func (c *Cache) Close() {
}
if c.refreshTicker != nil {
c.refreshTicker.Stop()
msel.Delete(c.refreshTask)
c.refreshTicker = nil
c.refreshTask = nil
}
if c.expireTicker != nil {
c.expireTicker.Stop()
msel.Delete(c.expireTask)
c.expireTicker = nil
c.expireTask = nil
}
}

Expand Down Expand Up @@ -292,7 +264,7 @@ func (c *Cache) DeleteFunc(match func(key string) bool) {
})
}

func (c *Cache) doRefresh(_ time.Time, _ bool) {
func (c *Cache) doRefresh() {
if atomic.LoadInt32(&c.closed) > 0 {
return
}
Expand Down Expand Up @@ -325,7 +297,7 @@ func (c *Cache) doRefresh(_ time.Time, _ bool) {
})
}

func (c *Cache) doExpire(_ time.Time, _ bool) {
func (c *Cache) doExpire() {
if atomic.LoadInt32(&c.closed) > 0 {
return
}
Expand Down
6 changes: 3 additions & 3 deletions exp/singleflight/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,17 @@ func TestExpire(t *testing.T) {
assert.True(t, trigger)

// first expire will mark entries as inactive
c.doExpire(time.Time{}, true)
c.doExpire()

trigger = false
c.Get("alive")
assert.False(t, trigger)

// second expire, both default & expire will be removed
c.doExpire(time.Time{}, true)
c.doExpire()

// make sure refresh does not affect expire
c.doRefresh(time.Time{}, true)
c.doRefresh()

trigger = false
c.Get("alive")
Expand Down
90 changes: 90 additions & 0 deletions exp/singleflight/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package singleflight

import (
"sync"
"time"

"github.com/jxskiss/gopkg/v2/perf/mselect"
)

// For low frequency ticker, we use the mselect to reduce the number of goroutines.
const lowFrequencyThreshold = 5 * time.Second

var (
mselOnce sync.Once
msel mselect.ManySelect
)

func initManySelect() {
mselOnce.Do(func() { msel = mselect.New() })
}

type callbackTicker interface {
Stop()
}

func newCallbackTicker(d time.Duration, callback func()) callbackTicker {
if d < lowFrequencyThreshold {
return newStdTicker(d, callback)
}
return newManySelectTicker(d, callback)
}

type stdTickerImpl struct {
ticker *time.Ticker
close chan struct{}
callback func()
}

func newStdTicker(d time.Duration, callback func()) *stdTickerImpl {
impl := &stdTickerImpl{
ticker: time.NewTicker(d),
close: make(chan struct{}),
callback: callback,
}
go impl.run()
return impl
}

func (t *stdTickerImpl) Stop() {
t.ticker.Stop()
close(t.close)
}

func (t *stdTickerImpl) run() {
for {
select {
case <-t.ticker.C:
t.callback()
case <-t.close:
return
}
}
}

type manySelectTickerImpl struct {
ticker *time.Ticker
task *mselect.Task
}

func newManySelectTicker(d time.Duration, asyncCallback func()) *manySelectTickerImpl {
initManySelect()
ticker := time.NewTicker(d)
task := mselect.NewTask(ticker.C, nil,
func(_ time.Time, ok bool) {
if ok {
asyncCallback()
}
})
msel.Add(task)
impl := &manySelectTickerImpl{
ticker: ticker,
task: task,
}
return impl
}

func (t *manySelectTickerImpl) Stop() {
t.ticker.Stop()
msel.Delete(t.task)
}
53 changes: 53 additions & 0 deletions exp/singleflight/ticker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package singleflight

import (
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestCallbackTicker(t *testing.T) {
ticker1 := newCallbackTicker(time.Second, func() {})
assert.IsType(t, &stdTickerImpl{}, ticker1)

ticker2 := newManySelectTicker(time.Second, func() {})
assert.IsType(t, &manySelectTickerImpl{}, ticker2)

time.Sleep(100 * time.Millisecond)
ticker1.Stop()
ticker2.Stop()
}

func TestStdTickerImpl(t *testing.T) {
var count int32
ticker := newStdTicker(100*time.Millisecond, func() {
atomic.AddInt32(&count, 1)
})

time.Sleep(1100 * time.Millisecond)
ticker.Stop()
n1 := atomic.LoadInt32(&count)
assert.True(t, n1 >= 9 && n1 <= 11)

time.Sleep(300 * time.Millisecond)
n2 := atomic.LoadInt32(&count)
assert.True(t, n2 >= 9 && n2 <= 11)
}

func TestManySelectTickerImpl(t *testing.T) {
var count int32
ticker := newManySelectTicker(100*time.Millisecond, func() {
atomic.AddInt32(&count, 1)
})

time.Sleep(1100 * time.Millisecond)
ticker.Stop()
n1 := atomic.LoadInt32(&count)
assert.True(t, n1 >= 9 && n1 <= 11)

time.Sleep(300 * time.Millisecond)
n2 := atomic.LoadInt32(&count)
assert.True(t, n2 >= 9 && n2 <= 11)
}

0 comments on commit 0942ea0

Please sign in to comment.