Skip to content

Commit

Permalink
Ftr: change timeQ from limit chan to ultimate chan (#77)
Browse files Browse the repository at this point in the history
* Ftr: change timeQ from limit chan to ultimate chan

* fmt import

* remove full error

* add error log

* remove \n
  • Loading branch information
Lvnszn authored Nov 18, 2021
1 parent 26777ca commit 7eddead
Showing 1 changed file with 18 additions and 24 deletions.
42 changes: 18 additions & 24 deletions time/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import (
uatomic "go.uber.org/atomic"
)

var (
// nolint
ErrTimeChannelFull = errors.New("timer channel full")
// nolint
ErrTimeChannelClosed = errors.New("timer channel closed")
import (
gxchan "github.com/dubbogo/gost/container/chan"
gxlog "github.com/dubbogo/gost/log"
)

// nolint
var ErrTimeChannelClosed = errors.New("timer channel closed")

// InitDefaultTimerWheel initializes a default timer wheel
func InitDefaultTimerWheel() {
defaultTimerWheelOnce.Do(func() {
Expand Down Expand Up @@ -164,7 +165,7 @@ type TimerWheel struct {
slot [maxTimerLevel]*list.List // timer list

enable uatomic.Bool // timer ready or closed
timerQ chan *timerNodeAction // timer event notify channel
timerQ *gxchan.UnboundedChan // timer event notify channel

once sync.Once // for close ticker
ticker *time.Ticker // virtual atomic clock
Expand All @@ -177,7 +178,7 @@ func NewTimerWheel() *TimerWheel {
clock: atomic.LoadInt64(&curGxTime),
// in fact, the minimum time accuracy is 10ms.
ticker: time.NewTicker(time.Duration(minTickerInterval)),
timerQ: make(chan *timerNodeAction, timerNodeQueueSize),
timerQ: gxchan.NewUnboundedChan(timerNodeQueueSize),
}

w.enable.Store(true)
Expand All @@ -191,10 +192,8 @@ func NewTimerWheel() *TimerWheel {
go func() {
defer w.wg.Done()
var (
t time.Time
cFlag bool
nodeAction *timerNodeAction
qFlag bool
t time.Time
cFlag bool
)

LOOP:
Expand All @@ -214,11 +213,12 @@ func NewTimerWheel() *TimerWheel {
if ret == 0 {
w.run()
}
case nodeAction, qFlag = <-w.timerQ:
case node, qFlag := <-w.timerQ.Out():
if !qFlag {
break LOOP
}

nodeAction := node.(*timerNodeAction)
// just one w.timerQ channel to ensure the exec sequence of timer event.
switch {
case nodeAction.action == TimerActionAdd:
Expand Down Expand Up @@ -509,13 +509,10 @@ func (w *TimerWheel) AddTimer(f TimerFunc, typ TimerType, period time.Duration,
t := &Timer{w: w}
node := newTimerNode(f, typ, int64(period), arg)
select {
case w.timerQ <- &timerNodeAction{node: node, action: TimerActionAdd}:
case w.timerQ.In() <- &timerNodeAction{node: node, action: TimerActionAdd}:
t.ID = node.ID
return t, nil
default:
}

return nil, ErrTimeChannelFull
}

func (w *TimerWheel) deleteTimer(t *Timer) error {
Expand All @@ -524,12 +521,9 @@ func (w *TimerWheel) deleteTimer(t *Timer) error {
}

select {
case w.timerQ <- &timerNodeAction{action: TimerActionDel, node: &timerNode{ID: t.ID}}:
case w.timerQ.In() <- &timerNodeAction{action: TimerActionDel, node: &timerNode{ID: t.ID}}:
return nil
default:
}

return ErrTimeChannelFull
}

func (w *TimerWheel) resetTimer(t *Timer, d time.Duration) error {
Expand All @@ -538,12 +532,9 @@ func (w *TimerWheel) resetTimer(t *Timer, d time.Duration) error {
}

select {
case w.timerQ <- &timerNodeAction{action: TimerActionReset, node: &timerNode{ID: t.ID, period: int64(d)}}:
case w.timerQ.In() <- &timerNodeAction{action: TimerActionReset, node: &timerNode{ID: t.ID, period: int64(d)}}:
return nil
default:
}

return ErrTimeChannelFull
}

func sendTime(_ TimerID, t time.Time, arg interface{}) error {
Expand Down Expand Up @@ -571,6 +562,7 @@ func (w *TimerWheel) NewTimer(d time.Duration) *Timer {
return t
}

gxlog.CError("addTimer fail, err is %v", err)
close(c)
return nil
}
Expand Down Expand Up @@ -627,6 +619,7 @@ func (w *TimerWheel) NewTicker(d time.Duration) *Ticker {
return (*Ticker)(timer)
}

gxlog.CError("addTimer fail, err is %v", err)
close(c)
return nil
}
Expand All @@ -637,6 +630,7 @@ func (w *TimerWheel) TickFunc(d time.Duration, f func()) *Ticker {
if err == nil {
return (*Ticker)(t)
}
gxlog.CError("addTimer fail, err is %v", err)

return nil
}
Expand Down

0 comments on commit 7eddead

Please sign in to comment.