Skip to content

Commit

Permalink
refactor: 优化及重构 server 包关于 WebSocket 的消息类型和消息分流部分内容
Browse files Browse the repository at this point in the history
- 优化 server 包中 WebSocket 服务器默认响应的消息类型与发信方不同步的问题;
- 移除 server.WithShunt
函数,调整为通过 server.Server.UseShunt 来动态分流渠道,例如可以将用户连接的渠道在用户自身渠道或游戏房间渠道来回切换;
  • Loading branch information
kercylan98 committed Dec 1, 2023
1 parent 8e94a66 commit dc557a0
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 255 deletions.
20 changes: 9 additions & 11 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ func newBotConn(server *Server) *Conn {
return c
}

// Conn 服务器连接单次会话的包装
// Conn 服务器连接单次消息的包装
type Conn struct {
*connection
wst int
ctx context.Context
}

Expand Down Expand Up @@ -160,7 +161,7 @@ func (slf *Conn) GetWebsocketRequest() *http.Request {

// IsBot 是否是机器人连接
func (slf *Conn) IsBot() bool {
return slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil
return slf != nil && slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil
}

// RemoteAddr 获取远程地址
Expand Down Expand Up @@ -229,15 +230,15 @@ func (slf *Conn) IsWebsocket() bool {
return slf.server.network == NetworkWebsocket
}

// GetWST 获取websocket消息类型
// GetWST 获取本次 websocket 消息类型
// - 默认将与发送类型相同
func (slf *Conn) GetWST() int {
wst, _ := slf.ctx.Value(contextKeyWST).(int)
return wst
return slf.wst
}

// SetWST 设置websocket消息类型
// SetWST 设置本次 websocket 消息类型
func (slf *Conn) SetWST(wst int) *Conn {
slf.ctx = context.WithValue(slf.ctx, contextKeyWST, wst)
slf.wst = wst
return slf
}

Expand All @@ -255,7 +256,6 @@ func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callba
}

// Write 向连接中写入数据
// - messageType: websocket模式中指定消息类型
func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
if slf.gw != nil {
slf.gw(packet)
Expand Down Expand Up @@ -356,9 +356,7 @@ func (slf *Conn) Close(err ...error) {
if slf.ticker != nil {
slf.ticker.Release()
}
if slf.server.shuntMatcher != nil {
slf.server.releaseDispatcher(slf.server.shuntMatcher(slf))
}
slf.server.releaseDispatcher(slf)
slf.pool.Close()
slf.loop.Close()
slf.mu.Unlock()
Expand Down
4 changes: 0 additions & 4 deletions server/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,3 @@ const (
DefaultWebsocketReadDeadline = 30 * time.Second
DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB
)

const (
contextKeyWST = "_wst" // WebSocket 消息类型
)
4 changes: 3 additions & 1 deletion server/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
var dispatcherUnique = struct{}{}

// generateDispatcher 生成消息分发器
func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
func generateDispatcher(name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
return &dispatcher{
name: name,
buffer: buffer.NewUnboundedN[*Message](),
handler: handler,
uniques: haxmap.New[string, struct{}](),
Expand All @@ -18,6 +19,7 @@ func generateDispatcher(handler func(dispatcher *dispatcher, message *Message))

// dispatcher 消息分发器
type dispatcher struct {
name string
buffer *buffer.Unbounded[*Message]
uniques *haxmap.Map[string, struct{}]
handler func(dispatcher *dispatcher, message *Message)
Expand Down
257 changes: 130 additions & 127 deletions server/event.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions server/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ const (

// MessageTypeSystem 系统消息类型
MessageTypeSystem

// MessageTypeShunt 普通分流消息类型
MessageTypeShunt
)

var messageNames = map[MessageType]string{
Expand All @@ -60,6 +63,7 @@ var messageNames = map[MessageType]string{
MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync",
MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback",
MessageTypeSystem: "MessageTypeSystem",
MessageTypeShunt: "MessageTypeShunt",
}

const (
Expand Down Expand Up @@ -209,3 +213,9 @@ func (slf *Message) castToErrorMessage(err error, action MessageErrorAction, mar
slf.t, slf.err, slf.errAction, slf.marks = MessageTypeError, err, action, mark
return slf
}

// castToShuntMessage 将消息转换为分流消息
func (slf *Message) castToShuntMessage(conn *Conn, caller func(), mark ...log.Field) *Message {
slf.t, slf.conn, slf.ordinaryHandler, slf.marks = MessageTypeShunt, conn, caller, mark
return slf
}
2 changes: 1 addition & 1 deletion server/multiple.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (slf *MultipleServer) Run() {
go func(address string, server *Server) {
var lock sync.Mutex
var startFinish bool
server.startFinishEventHandles.Append(func(srv *Server) {
server.startFinishEventHandlers.Append(func(srv *Server) {
lock.Lock()
defer lock.Unlock()
if !startFinish {
Expand Down
52 changes: 12 additions & 40 deletions server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/timer"
"google.golang.org/grpc"
"runtime/debug"
"time"
)

Expand All @@ -29,19 +28,18 @@ type option struct {
}

type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小
ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小
websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
shuntMatcher func(conn *Conn) string // 分流匹配器
packetWarnSize int // 数据包大小警告
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小
ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小
websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
packetWarnSize int // 数据包大小警告
}

// WithPacketWarnSize 通过数据包大小警告的方式创建服务器,当数据包大小超过指定大小时,将会输出 WARN 类型的日志
Expand All @@ -58,32 +56,6 @@ func WithPacketWarnSize(size int) Option {
}
}

// WithShunt 通过连接数据包分流的方式创建服务器
// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况
// - shuntMatcher:用于匹配连接的函数,返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道
//
// 将被分流的消息类型(更多类型有待斟酌):
// - MessageTypePacket
//
// 注意事项:
// - 当分流匹配过程发生 panic 将会在系统通道内处理消息,并打印日志
func WithShunt(shuntMatcher func(conn *Conn) string) Option {
return func(srv *Server) {
if shuntMatcher == nil {
log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil"))
return
}
srv.shuntMatcher = func(conn *Conn) string {
defer func() {
if err := recover(); err != nil {
log.Error("ShuntMatcher", log.String("State", "Panic"), log.Any("Error", err), log.String("Stack", string(debug.Stack())))
}
}()
return shuntMatcher(conn)
}
}
}

// WithLimitLife 通过限制最大生命周期的方式创建服务器
// - 通常用于测试服务器,服务器将在到达最大生命周期时自动关闭
func WithLimitLife(t time.Duration) Option {
Expand Down
Loading

0 comments on commit dc557a0

Please sign in to comment.