From cdf2966bbdec4acb0becb62f4efe11d44f61819a Mon Sep 17 00:00:00 2001 From: guonaihong Date: Sat, 10 Feb 2024 17:51:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ereset=E5=87=BD=E6=95=B0?= =?UTF-8?q?=EF=BC=8CTODO=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- min_heap.go | 4 +++- min_heap_node.go | 2 +- time_wheel.go | 2 ++ time_wheel_node.go | 18 ++++++++++++++++-- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/min_heap.go b/min_heap.go index 9eb88f8..1b31aed 100644 --- a/min_heap.go +++ b/min_heap.go @@ -86,7 +86,9 @@ func (m *minHeap) removeTimeNode(node *minHeapNode) { func (m *minHeap) resetTimeNode(node *minHeapNode, d time.Duration) { m.mu.Lock() - heap.Push(&m.minHeaps, node) + node.userExpire = d + node.absExpire = time.Now().Add(d) + heap.Fix(&m.minHeaps, int(node.index)) select { case m.chAdd <- struct{}{}: default: diff --git a/min_heap_node.go b/min_heap_node.go index da31a88..3fd922c 100644 --- a/min_heap_node.go +++ b/min_heap_node.go @@ -18,7 +18,7 @@ func (m *minHeapNode) Stop() { m.root.removeTimeNode(m) } func (m *minHeapNode) Reset(d time.Duration) { - + m.root.resetTimeNode(m, d) } func (m *minHeapNode) Next(now time.Time) time.Time { diff --git a/time_wheel.go b/time_wheel.go index 7adfe75..b6d0580 100644 --- a/time_wheel.go +++ b/time_wheel.go @@ -129,6 +129,7 @@ func (t *timeWheel) AfterFunc(expire time.Duration, callback func()) TimeNoder { node := &timeNode{ expire: uint64(expire), callback: callback, + root: t, } return t.add(node, jiffies) @@ -148,6 +149,7 @@ func (t *timeWheel) ScheduleFunc(userExpire time.Duration, callback func()) Time expire: uint64(expire), callback: callback, isSchedule: true, + root: t, } return t.add(node, jiffies) diff --git a/time_wheel_node.go b/time_wheel_node.go index 3bb712e..b7aea4d 100644 --- a/time_wheel_node.go +++ b/time_wheel_node.go @@ -59,7 +59,7 @@ type timeNode struct { list unsafe.Pointer //存放表头信息 version uint64 //保存节点版本信息 isSchedule bool - + root *timeWheel list.Head } @@ -90,6 +90,20 @@ func (t *timeNode) Stop() { cpyList.Del(&t.Head) } -func (t *timeNode) Reset(d time.Duration) { +// warning: 该函数目前没有稳定 +func (t *timeNode) Reset(expire time.Duration) { + cpyList := (*Time)(atomic.LoadPointer(&t.list)) + cpyList.Lock() + defer cpyList.Unlock() + // TODO: 这里有一个问题,如果在执行Reset的时候,这个节点已经被移动到tmp链表 + // if atomic.LoadUint64(&t.version) != atomic.LoadUint64(&cpyList.version) { + // return + // } + cpyList.Del(&t.Head) + jiffies := atomic.LoadUint64(&t.root.jiffies) + + expire = expire/(time.Millisecond*10) + time.Duration(jiffies) + t.expire = uint64(expire) + t.root.add(t, atomic.LoadUint64(&t.root.jiffies)) }