Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed atomic types to avoid alignment problems on 32 bit platforms #26

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions time_wheel.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (t *timeWheel) cascade(levelIndex int, index int) {
l.ReplaceInit(&tmp.Head)

// 每次链表的元素被移动走,都修改version
atomic.AddUint64(&l.version, 1)
l.version.Add(1)
l.Unlock()

offset := unsafe.Offsetof(tmp.Head)
Expand Down Expand Up @@ -220,7 +220,7 @@ func (t *timeWheel) moveAndExec() {
head := newTimeHead(0, 0)
t1 := t.t1[index]
t1.ReplaceInit(&head.Head)
atomic.AddUint64(&t1.version, 1)
t1.version.Add(1)
t.t1[index].Unlock()

// 执行,链表中的定时器
Expand All @@ -230,7 +230,7 @@ func (t *timeWheel) moveAndExec() {
val := (*timeNode)(pos.Entry(offset))
head.Del(pos)

if atomic.LoadUint32(&val.stop) == haveStop {
if val.stop.Load() == haveStop {
return
}

Expand Down
16 changes: 8 additions & 8 deletions time_wheel_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ type Time struct {
// level 在near盘子里就是1, 在T2ToTt[0]盘子里就是2起步
// index 就是各自盘子的索引值
// seq 自增id
version uint64
version atomic.Uint64
}

func newTimeHead(level uint64, index uint64) *Time {
head := &Time{}
head.version = genVersionHeight(level, index)
head.version.Store(genVersionHeight(level, index))
head.Init()
return head
}
Expand All @@ -44,23 +44,23 @@ func genVersionHeight(level uint64, index uint64) uint64 {
func (t *Time) lockPushBack(node *timeNode, level uint64, index uint64) {
t.Lock()
defer t.Unlock()
if atomic.LoadUint32(&node.stop) == haveStop {
if node.stop.Load() == haveStop {
return
}

t.AddTail(&node.Head)
atomic.StorePointer(&node.list, unsafe.Pointer(t))
//更新节点的version信息
atomic.StoreUint64(&node.version, atomic.LoadUint64(&t.version))
node.version.Store(t.version.Load())
}

type timeNode struct {
expire uint64
userExpire time.Duration
callback func()
stop uint32
stop atomic.Uint32
list unsafe.Pointer //存放表头信息
version uint64 //保存节点版本信息
version atomic.Uint64 //保存节点版本信息
isSchedule bool
root *timeWheel
list.Head
Expand All @@ -78,15 +78,15 @@ type timeNode struct {
// 2和3.2状态会是没有锁保护下的操作,会有数据竞争
func (t *timeNode) Stop() bool {

atomic.StoreUint32(&t.stop, haveStop)
t.stop.Store(haveStop)

// 使用版本号算法让timeNode知道自己是否被移动了
// timeNode的version和表头的version一样表示没有被移动可以直接删除
// 如果不一样,可能在第2或者3.2状态,使用惰性删除
cpyList := (*Time)(atomic.LoadPointer(&t.list))
cpyList.Lock()
defer cpyList.Unlock()
if atomic.LoadUint64(&t.version) != atomic.LoadUint64(&cpyList.version) {
if t.version.Load() != cpyList.version.Load() {
return false
}

Expand Down
Loading