Skip to content

Commit

Permalink
refactor(vivid): optimize message dispatching for instant delivery
Browse files Browse the repository at this point in the history
Change the message enqueue logic to support instant message delivery without
locking the mailbox. This avoids modifying the message counter and improves
the performance for instantly delivered messages by using a separate channel.

BREAKING CHANGE: Mailbox now requires an additional parameter for Enqueue method
to specify if the message should be delivered instantly. This may affect the
clients relying on the previous signature of the Enqueue method.
  • Loading branch information
kercylan98 committed Jun 14, 2024
1 parent e306a53 commit cf23e79
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 110 deletions.
11 changes: 1 addition & 10 deletions minotaur/vivid/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,5 @@ func (d *_Dispatcher) Detach(system ActorSystemCore, actor ActorCore) {

func (d *_Dispatcher) Send(system ActorSystemCore, actor ActorCore, message MessageContext) bool {
mailbox := actor.GetMailbox()
if message.Instantly() {
mailbox.GetLockable().Lock()
defer func() {
actor.ModifyMessageCounter(-1)
mailbox.GetLockable().Unlock()
}()
onReceive(actor, message)
return true
}
return mailbox.Enqueue(message)
return mailbox.Enqueue(message, message.Instantly())
}
8 changes: 5 additions & 3 deletions minotaur/vivid/mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ type Mailbox interface {
Stop()

// Enqueue 将一个消息放入队列
Enqueue(message MessageContext) bool
Enqueue(message MessageContext, instantly bool) bool
}

// GetLockable 获取队列的锁
GetLockable() sync.Locker
type instantlyMessage struct {
message MessageContext
mu sync.Mutex
}
124 changes: 74 additions & 50 deletions minotaur/vivid/mailbox_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,47 @@ type (

func NewFIFO(handler func(message MessageContext), opts ...*FIFOOptions) *FIFO {
f := &FIFO{
opts: NewFIFOOptions().Apply(opts...),
status: fifoStateNone,
cond: sync.NewCond(&sync.Mutex{}),
closed: make(chan struct{}),
handler: handler,
opts: NewFIFOOptions().Apply(opts...),
status: fifoStateNone,
cond: sync.NewCond(&sync.Mutex{}),
closed: make(chan struct{}),
handler: handler,
instantly: make(chan *instantlyMessage, 1),
}
f.buffer = buffer.NewRing[MessageContext](int(f.opts.BufferSize))
return f
}

// FIFO 是一个先进先出的消息队列
type FIFO struct {
opts *FIFOOptions // 配置
status fifoState // 队列状态
cond *sync.Cond // 消息队列条件变量
buffer *buffer.Ring[MessageContext] // 消息缓冲区
closed chan struct{} // 关闭信号
handler func(message MessageContext) // 消息处理函数
opts *FIFOOptions // 配置
status fifoState // 队列状态
cond *sync.Cond // 消息队列条件变量
buffer *buffer.Ring[MessageContext] // 消息缓冲区
closed chan struct{} // 关闭信号
handler func(message MessageContext) // 消息处理函数
instantly chan *instantlyMessage // 立即处理的消息
}

func (f *FIFO) Start() {
f.cond.L.Lock()
if f.status != fifoStateNone {
f.cond.L.Unlock()
func (m *FIFO) Start() {
m.cond.L.Lock()
if m.status != fifoStateNone {
m.cond.L.Unlock()
return
}
f.status = fifoStateRunning
f.cond.L.Unlock()
m.status = fifoStateRunning
m.cond.L.Unlock()

f.closed = make(chan struct{})
m.closed = make(chan struct{})
go func(f *FIFO) {
defer func(f *FIFO) {
close(f.closed)
f.buffer.Reset()
}(f)

for {
f.processInstantly()

f.cond.L.Lock()
elements := f.buffer.ReadAll()
if len(elements) == 0 {
Expand All @@ -76,60 +80,80 @@ func (f *FIFO) Start() {
f.cond.L.Unlock()

for i := 0; i < len(elements); i++ {
f.processInstantly()
elem := elements[i]
f.handler(elem)
}
}
}(f)
}(m)
}

func (f *FIFO) Stop() {
f.cond.L.Lock()
if f.status != fifoStateRunning {
f.cond.L.Unlock()
func (m *FIFO) Stop() {
m.cond.L.Lock()
if m.status != fifoStateRunning {
m.cond.L.Unlock()
return
}
f.status = fifoStateStopping
f.cond.L.Unlock()
m.status = fifoStateStopping
m.cond.L.Unlock()

f.cond.Signal()
m.cond.Signal()

<-f.closed
<-m.closed
}

func (f *FIFO) Enqueue(message MessageContext) bool {
f.cond.L.Lock()
if f.status != fifoStateNone {
if f.status != fifoStateRunning {
if f.opts.StopMode != FIFOStopModeDrain {
f.cond.L.Unlock()
func (m *FIFO) Enqueue(message MessageContext, instantly bool) bool {
m.cond.L.Lock()
if m.status != fifoStateNone {
if m.status != fifoStateRunning {
if m.opts.StopMode != FIFOStopModeDrain {
m.cond.L.Unlock()
return false
}
}
}

f.buffer.Write(message)
f.cond.L.Unlock()
f.cond.Broadcast()
return true
}
if instantly {
m.cond.L.Unlock()
elem := &instantlyMessage{message: message}
elem.mu.Lock()
m.instantly <- elem
m.cond.Broadcast()
elem.mu.Lock()
elem.mu.Unlock()
return true
}

func (f *FIFO) GetLockable() sync.Locker {
return f.cond.L
m.buffer.Write(message)
m.cond.L.Unlock()
m.cond.Broadcast()
return true
}

func (f *FIFO) reset() {
f.cond.L.Lock()
if f.status < fifoStateStopping {
f.cond.L.Unlock()
f.Stop()
func (m *FIFO) reset() {
m.cond.L.Lock()
if m.status < fifoStateStopping {
m.cond.L.Unlock()
m.Stop()
} else {
f.cond.L.Unlock()
m.cond.L.Unlock()
}

f.cond.L.Lock()
defer f.cond.L.Unlock()
m.cond.L.Lock()
defer m.cond.L.Unlock()

f.buffer.Reset()
f.status = fifoStateNone
m.buffer.Reset()
m.status = fifoStateNone

close(m.instantly)
m.instantly = make(chan *instantlyMessage, 1)
}

func (m *FIFO) processInstantly() {
select {
case elem := <-m.instantly:
defer elem.mu.Unlock()
m.handler(elem.message)
default:
}
}
114 changes: 67 additions & 47 deletions minotaur/vivid/mailbox_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,37 @@ type (

func NewPriority(handler func(message MessageContext), opts ...*PriorityOptions) *Priority {
p := &Priority{
opts: NewPriorityOptions().Apply(opts...),
status: priorityStateNone,
cond: sync.NewCond(&sync.Mutex{}),
closed: make(chan struct{}),
handler: handler,
opts: NewPriorityOptions().Apply(opts...),
status: priorityStateNone,
cond: sync.NewCond(&sync.Mutex{}),
closed: make(chan struct{}),
handler: handler,
instantly: make(chan *instantlyMessage, 1),
}
p.buffer = make(priorityHeap, 0, p.opts.BufferSize)
return p
}

type Priority struct {
opts *PriorityOptions // 配置
status priorityState // 队列状态
cond *sync.Cond // 消息队列条件变量
buffer priorityHeap // 消息缓冲区
closed chan struct{} // 关闭信号
handler func(message MessageContext) // 消息处理函数
opts *PriorityOptions // 配置
status priorityState // 队列状态
cond *sync.Cond // 消息队列条件变量
buffer priorityHeap // 消息缓冲区
closed chan struct{} // 关闭信号
handler func(message MessageContext) // 消息处理函数
instantly chan *instantlyMessage // 立即处理的消息
}

func (p *Priority) Start() {
p.cond.L.Lock()
if p.status != priorityStateNone {
p.cond.L.Unlock()
func (m *Priority) Start() {
m.cond.L.Lock()
if m.status != priorityStateNone {
m.cond.L.Unlock()
return
}
p.status = priorityStateRunning
p.cond.L.Unlock()
m.status = priorityStateRunning
m.cond.L.Unlock()

p.closed = make(chan struct{})
m.closed = make(chan struct{})
go func(p *Priority) {
defer func(p *Priority) {
close(p.closed)
Expand All @@ -63,6 +65,7 @@ func (p *Priority) Start() {
}(p)

for {
p.processInstantly()
p.cond.L.Lock()
if p.buffer.Len() == 0 {
if p.status == priorityStateStopping {
Expand All @@ -81,48 +84,65 @@ func (p *Priority) Start() {

p.handler(msg)
}
}(p)
}(m)
}

func (p *Priority) Stop() {
p.cond.L.Lock()
if p.status != priorityStateRunning {
p.cond.L.Unlock()
func (m *Priority) Stop() {
m.cond.L.Lock()
if m.status != priorityStateRunning {
m.cond.L.Unlock()
return
}
p.status = priorityStateStopping
p.cond.L.Unlock()
p.cond.Signal()
m.status = priorityStateStopping
m.cond.L.Unlock()
m.cond.Signal()
}

func (p *Priority) Enqueue(message MessageContext) bool {
p.cond.L.Lock()
if p.status != priorityStateRunning {
p.cond.L.Unlock()
func (m *Priority) Enqueue(message MessageContext, instantly bool) bool {
m.cond.L.Lock()
if m.status != priorityStateRunning {
m.cond.L.Unlock()
return false
}
heap.Push(&p.buffer, message)
p.cond.L.Unlock()
p.cond.Signal()
return true
}

func (p *Priority) GetLockable() sync.Locker {
return p.cond.L
if instantly {
m.cond.L.Unlock()
elem := &instantlyMessage{message: message}
elem.mu.Lock()
m.instantly <- elem
m.cond.Broadcast()
elem.mu.Lock()
elem.mu.Unlock()
return true
}

heap.Push(&m.buffer, message)
m.cond.L.Unlock()
m.cond.Signal()
return true
}

func (p *Priority) reset() {
p.cond.L.Lock()
if p.status < fifoStateStopping {
p.cond.L.Unlock()
p.Stop()
func (m *Priority) reset() {
m.cond.L.Lock()
if m.status < fifoStateStopping {
m.cond.L.Unlock()
m.Stop()
} else {
p.cond.L.Unlock()
m.cond.L.Unlock()
}

p.cond.L.Lock()
defer p.cond.L.Unlock()
m.cond.L.Lock()
defer m.cond.L.Unlock()

m.status = priorityStateNone
m.buffer = m.buffer[:0]
}

p.status = priorityStateNone
p.buffer = p.buffer[:0]
func (m *Priority) processInstantly() {
select {
case elem := <-m.instantly:
defer elem.mu.Unlock()
m.handler(elem.message)
default:
}
}

0 comments on commit cf23e79

Please sign in to comment.