Skip to content

Commit

Permalink
feat(actor-context): add discardOld parameter to Become method
Browse files Browse the repository at this point in the history
Introduce an optional `discardOld` parameter to the `Become` method in the
actor context. This allows for the replacement of the current behavior without
keeping a stack of previous behaviors, optimizing memory usage in some cases.
  • Loading branch information
kercylan98 committed Jun 14, 2024
1 parent 306bb0f commit 0bf8601
Show file tree
Hide file tree
Showing 26 changed files with 141 additions and 1,202 deletions.
3 changes: 3 additions & 0 deletions minotaur/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func TestNewApplication(t *testing.T) {
case transport.ServerConnectionOpenedEvent:
conn := m.Conn

conn.Api().SetPacketHandler(func(ctx vivid.MessageContext, conn transport.Conn, packet transport.ConnectionReactPacketMessage) {

})
conn.Api().LoadMod(vivid.ModOf[AccountModExposer](&AccountMod{}))
conn.Api().LoadMod(vivid.ModOf[BagModExposer](&BagMod{}))

Expand Down
45 changes: 33 additions & 12 deletions minotaur/transport/conn_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,54 @@ import (
)

type ConnActor struct {
Conn net.Conn
Writer ConnWriter
Conn net.Conn
Writer vivid.ActorRef
Typed vivid.TypedActorRef[ConnActorTyped]
TerminateHandler ConnTerminateHandler
}

func (c *ConnActor) OnReceive(ctx vivid.MessageContext) {
switch m := ctx.GetMessage().(type) {
case vivid.OnBoot:
case ConnectionInitMessage:
c.Conn, c.Writer = m.Conn, m.Writer
ctx.Become(vivid.BehaviorOf[ConnectionReactPacketMessage](c.onReactPacket))
case ConnectionBecomeReactPacketMessage:
ctx.Become(m.Behavior)
case ConnectionWritePacketMessage:
if err := c.Writer(m.Packet); err != nil {
ctx.Stop()
}
c.onInit(ctx, m)
case ConnectionSetPacketHandlerMessage:
behavior := vivid.BehaviorOf(func(ctx vivid.MessageContext, packet ConnectionReactPacketMessage) {
m.Handler(ctx, c.Typed, packet)
})
ctx.Become(behavior, true)
case ConnectionSetTerminateHandlerMessage:
c.TerminateHandler = m.Handler
case ConnectionLoadModMessage:
ctx.LoadMod(m.Mods...)
case ConnectionUnloadModMessage:
ctx.UnloadMod(m.Mods...)
case ConnectionApplyModMessage:
ctx.ApplyMod()
case vivid.OnTerminate:
if c.TerminateHandler != nil {
c.TerminateHandler(ctx, c.Typed, m)
}
_ = c.Conn.Close()
}
}

func (c *ConnActor) onReactPacket(ctx vivid.MessageContext, message ConnectionReactPacketMessage) {
ctx.Publish(ConnectionReceivePacketEvent{})
func (c *ConnActor) onInit(ctx vivid.MessageContext, m ConnectionInitMessage) {
c.Conn = m.Conn
c.Writer = ctx.ActorOf(vivid.OfO(func(options *vivid.ActorOptions[*ConnWriteActor]) {
options.WithName("writer")
options.WithInit(func(actor *ConnWriteActor) {
actor.Writer = m.Writer
})

options.WithSupervisor(func(message, reason vivid.Message) vivid.Directive {
// 关闭连接及其写入器
ctx.Stop()
return vivid.DirectiveStop
})
}))
c.Typed = vivid.Typed[ConnActorTyped](ctx.GetRef(), &ConnActorTypedImpl{
ConnActorRef: ctx.GetRef(),
ConnWriterActorRef: c.Writer,
})
}
28 changes: 17 additions & 11 deletions minotaur/transport/conn_actor_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,32 @@ import (
"net"
)

type (
ConnPacketHandler func(ctx vivid.MessageContext, conn Conn, packet ConnectionReactPacketMessage)
ConnTerminateHandler func(ctx vivid.MessageContext, conn Conn, message vivid.OnTerminate)
)

type (
// ConnectionInitMessage 连接初始化消息
ConnectionInitMessage struct {
Conn net.Conn // 连接
Writer ConnWriter // 连接写入器
Conn net.Conn // 连接
Writer ConnWriter // 连接写入器
ActorHook func(actor *ConnActor) // 连接 Actor 引用钩子
}

// ConnectionBecomeReactPacketMessage 切换响应数据包消息行为
ConnectionBecomeReactPacketMessage struct {
Behavior vivid.Behavior // 行为
// ConnectionSetPacketHandlerMessage 切换响应数据包消息行为
ConnectionSetPacketHandlerMessage struct {
Handler ConnPacketHandler
}

// ConnectionReactPacketMessage 连接响应数据包消息
ConnectionReactPacketMessage struct {
Packet Packet // 数据包
// ConnectionSetTerminateHandlerMessage 设置连接终止处理器消息
ConnectionSetTerminateHandlerMessage struct {
Handler ConnTerminateHandler
}

// ConnectionWritePacketMessage 连接写入数据包消息
ConnectionWritePacketMessage struct {
Packet Packet
// ConnectionReactPacketMessage 连接响应数据包消息
ConnectionReactPacketMessage struct {
Packet // 数据包
}

// ConnectionLoadModMessage 加载模组消息
Expand Down
28 changes: 16 additions & 12 deletions minotaur/transport/conn_actor_typed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@ package transport

import "github.com/kercylan98/minotaur/minotaur/vivid"

type Conn = vivid.TypedActorRef[ConnActorTyped]

type ConnActorTyped interface {
// Write 向连接内写入数据包
Write(packet Packet)

// Close 关闭连接
Close()

// BecomeReactPacketMessage 切换响应数据包消息行为,默认情况下会发布 ConnectionReceivePacketEvent 事件,可以通过 SubscribePacketReceivedEvent 订阅
BecomeReactPacketMessage(handler func(vivid.MessageContext, ConnectionReactPacketMessage))

// SubscribePacketReceivedEvent 订阅数据包接收事件
SubscribePacketReceivedEvent(subscriber vivid.Subscriber)

// LoadMod 加载模组
LoadMod(mods ...vivid.ModInfo)

Expand All @@ -23,26 +19,34 @@ type ConnActorTyped interface {

// ApplyMod 应用模组
ApplyMod()

// SetPacketHandler 设置数据包处理器
// - 该函数是通过 become 实现,并且会丢弃旧的行为
SetPacketHandler(handler ConnPacketHandler)

// SetTerminateHandler 设置连接终止处理器
SetTerminateHandler(handler ConnTerminateHandler)
}

type ConnActorTypedImpl struct {
ConnActorRef vivid.ActorRef
ConnActorRef vivid.ActorRef
ConnWriterActorRef vivid.ActorRef
}

func (c *ConnActorTypedImpl) Write(packet Packet) {
c.ConnActorRef.Tell(ConnectionWritePacketMessage{Packet: packet})
c.ConnWriterActorRef.Tell(packet)
}

func (c *ConnActorTypedImpl) Close() {
c.ConnActorRef.Stop()
}

func (c *ConnActorTypedImpl) SubscribePacketReceivedEvent(subscriber vivid.Subscriber) {
c.ConnActorRef.GetSystem().Subscribe(subscriber, ConnectionReceivePacketEvent{})
func (c *ConnActorTypedImpl) SetPacketHandler(handler ConnPacketHandler) {
c.ConnActorRef.Tell(ConnectionSetPacketHandlerMessage{Handler: handler})
}

func (c *ConnActorTypedImpl) BecomeReactPacketMessage(handler func(vivid.MessageContext, ConnectionReactPacketMessage)) {
c.ConnActorRef.Tell(ConnectionBecomeReactPacketMessage{Behavior: vivid.BehaviorOf[ConnectionReactPacketMessage](handler)})
func (c *ConnActorTypedImpl) SetTerminateHandler(handler ConnTerminateHandler) {
c.ConnActorRef.Tell(ConnectionSetTerminateHandlerMessage{Handler: handler})
}

func (c *ConnActorTypedImpl) LoadMod(mods ...vivid.ModInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,23 @@ package transport

import (
"github.com/kercylan98/minotaur/minotaur/vivid"
"github.com/kercylan98/minotaur/toolkit/log"
"net"
)

type ()

type ConnWriter func(packet Packet) error

type ConnWriteActor struct {
conn net.Conn
writer ConnWriter
Writer ConnWriter
}

func (c *ConnWriteActor) OnReceive(ctx vivid.MessageContext) {
switch m := ctx.GetMessage().(type) {
case ConnectionWritePacketMessage:
case Packet:
c.onConnWriteMessage(ctx, m)
}
}

func (c *ConnWriteActor) onConnWriteMessage(ctx vivid.MessageContext, m ConnectionWritePacketMessage) {
if err := c.writer(m.Packet); err != nil {
log.Error("ConnActor write error: %v", err)
func (c *ConnWriteActor) onConnWriteMessage(ctx vivid.MessageContext, m Packet) {
if err := c.Writer(m); err != nil {
panic(err)
}
}
11 changes: 6 additions & 5 deletions minotaur/transport/server_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ func (s *ServerActor) onServerConnOpened(ctx vivid.MessageContext, m ServerConnO
// 创建连接 Actor 并绑定到服务器
connRef := ctx.ActorOf(vivid.OfO(func(options *vivid.ActorOptions[*ConnActor]) {
options.
WithName("conn" + m.conn.RemoteAddr().String()).
WithName("conn-" + m.conn.RemoteAddr().String()).
WithSupervisor(func(message, reason vivid.Message) vivid.Directive {
log.Error("connOpened", log.String("message", reflect.TypeOf(message).String()), log.Any("reason", reason))
return vivid.DirectiveStop
})
}))
connRef.Tell(ConnectionInitMessage{Conn: m.conn, Writer: m.writer}, vivid.WithInstantly(true))
connTyped := vivid.Typed[ConnActorTyped](connRef, &ConnActorTypedImpl{
ConnActorRef: connRef,
})
var connActor *ConnActor
connRef.Tell(ConnectionInitMessage{Conn: m.conn, Writer: m.writer, ActorHook: func(actor *ConnActor) {
connActor = actor
}}, vivid.WithInstantly(true)) // 由于是立即执行,可以直接获取写入器引用
connTyped := connActor.Typed
connExpandTyped := vivid.Typed[ConnActorExpandTyped](connRef, &ConnActorExpandTypedImpl{
ConnActorRef: connRef,
})
Expand Down
13 changes: 9 additions & 4 deletions minotaur/vivid/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ActorContext interface {
GetParent() ActorRef

// Become 切换 Actor 在面对特定消息的行为。通过调用 BehaviorOf 函数可以创建一个特定消息类型的行为,将 Actor 的消息处理逻辑切换为指定的新行为。新的行为会覆盖当前的行为,直到下次调用 Become 或 UnBecome 为止
Become(behavior Behavior)
Become(behavior Behavior, discardOld ...bool)

// UnBecome 恢复 Actor 在面对特性消息的行为为之前的行为,多次调用 UnBecome 会依次恢复之前的行为,直到没有行为为止
UnBecome(message Message)
Expand Down Expand Up @@ -81,13 +81,17 @@ func (c *_ActorContext) GetParent() ActorRef {
return c.parent
}

func (c *_ActorContext) Become(behavior Behavior) {
func (c *_ActorContext) Become(behavior Behavior, discardOld ...bool) {
if c.behaviors == nil {
c.behaviors = make(map[reflect.Type][]Behavior)
}

messageType := behavior.getMessageType()
c.behaviors[messageType] = append(c.behaviors[messageType], behavior)
if len(discardOld) > 0 && discardOld[0] {
c.behaviors[messageType] = c.behaviors[messageType][:0]
return
}
c.behaviors[messageType] = append([]Behavior{behavior}, c.behaviors[messageType]...)
}

func (c *_ActorContext) UnBecome(message Message) {
Expand All @@ -98,7 +102,8 @@ func (c *_ActorContext) UnBecome(message Message) {
messageType := reflect.TypeOf(message)
if behaviors, ok := c.behaviors[messageType]; ok {
if len(behaviors) > 0 {
c.behaviors[messageType] = behaviors[:len(behaviors)-1]
behaviors = behaviors[1:]
c.behaviors[messageType] = behaviors
}
if len(behaviors) == 0 {
delete(c.behaviors, messageType)
Expand Down
2 changes: 2 additions & 0 deletions minotaur/vivid/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"
)

// NewActorSystem 创建一个 ActorSystem,ActorSystem 是 Actor 的容器,用于管理 Actor 的生命周期、消息分发等
func NewActorSystem(name string) ActorSystem {
s := ActorSystem{
dispatchers: make(map[DispatcherId]Dispatcher),
Expand Down Expand Up @@ -53,6 +54,7 @@ func NewActorSystem(name string) ActorSystem {
return s
}

// ActorSystem Actor 系统
type ActorSystem struct {
logger *atomic.Pointer[log.Logger]
core *_ActorSystemCore
Expand Down
10 changes: 10 additions & 0 deletions minotaur/vivid/actor_system_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package vivid_test

import "github.com/kercylan98/minotaur/minotaur/vivid"

func ExampleNewActorSystem() {
vivid.NewActorSystem("example")

// Output:
//
}
4 changes: 2 additions & 2 deletions minotaur/vivid/internal_actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func (c *_internalActorContext) matchBehavior(message Message) Behavior {
}

behaviors, ok := c.behaviors[reflect.TypeOf(message)]
if !ok {
if !ok || len(behaviors) == 0 {
return nil
}

return behaviors[len(behaviors)-1]
return behaviors[0]
}

func (c *_internalActorContext) getParent() ActorContext {
Expand Down
6 changes: 3 additions & 3 deletions minotaur/vivid/message_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type MessageContext interface {
Instantly() bool

// Become 该函数是 ActorContext.Become 的快捷方式
Become(behavior Behavior)
Become(behavior Behavior, discardOld ...bool)

// UnBecome 该函数是 ActorContext.UnBecome 的快捷方式
UnBecome(message Message)
Expand Down Expand Up @@ -266,8 +266,8 @@ func (c *_MessageContext) Instantly() bool {
return c.InstantlyExec
}

func (c *_MessageContext) Become(behavior Behavior) {
c.GetContext().Become(behavior)
func (c *_MessageContext) Become(behavior Behavior, discardOld ...bool) {
c.GetContext().Become(behavior, discardOld...)
}

func (c *_MessageContext) UnBecome(message Message) {
Expand Down
2 changes: 2 additions & 0 deletions minotaur/vivid/message_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func (o *MessageOptions) apply(options []MessageOption) *MessageOptions {
}

// WithInstantly 设置消息是否立即执行,如果设置为立即执行,消息将会被立即执行,否则将会被放入邮箱等待执行
// - 由于没有放入邮箱等待执行,该消息是在当前线程中执行的,如果存在循环调用,可能会导致死锁
// - 该可选项在跨网络调用时可能不会产生效果
// - 该可选项将提供给 Dispatcher 进行处理,根据不同的 Dispatcher 实现,该可选项可能会被忽略
func WithInstantly(instantly bool) MessageOption {
return func(options *MessageOptions) {
Expand Down
4 changes: 2 additions & 2 deletions toolkit/log/gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ type GNetLogger struct {
}

func (l *GNetLogger) Debugf(format string, args ...interface{}) {
l.Logger.Debug("GNET", String("msg", fmt.Sprintf(format, args...)))
//l.Logger.Debug("GNET", String("msg", fmt.Sprintf(format, args...)))
}

func (l *GNetLogger) Infof(format string, args ...interface{}) {
l.Logger.Info("GNET", String("msg", fmt.Sprintf(format, args...)))
//l.Logger.Info("GNET", String("msg", fmt.Sprintf(format, args...)))
}

func (l *GNetLogger) Warnf(format string, args ...interface{}) {
Expand Down
4 changes: 2 additions & 2 deletions toolkit/log/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type Handler struct {
}

func (h *Handler) Enabled(ctx context.Context, level slog.Level) bool {
return level >= h.opts.Level
return level >= h.opts.leveler.Level()
}

func (h *Handler) Handle(ctx context.Context, record slog.Record) error {
if !h.Enabled(ctx, h.opts.Level) {
if !h.Enabled(ctx, h.opts.leveler.Level()) {
return nil
}

Expand Down
Loading

0 comments on commit 0bf8601

Please sign in to comment.