diff --git a/server/conn.go b/server/conn.go index 3e3c7a66..3f96a6cd 100644 --- a/server/conn.go +++ b/server/conn.go @@ -359,7 +359,11 @@ func (slf *Conn) Close(err ...error) { if slf.ticker != nil { slf.ticker.Release() } - slf.server.releaseDispatcher(slf) + if !slf.server.runtime.disableAutomaticReleaseShunt { + slf.server.releaseDispatcher(slf) + } else { + + } slf.loop.Close() slf.mu.Unlock() if len(err) > 0 { diff --git a/server/constants.go b/server/constants.go index e190685e..8bd70982 100644 --- a/server/constants.go +++ b/server/constants.go @@ -7,7 +7,7 @@ import ( const ( serverMultipleMark = "Minotaur Multiple Server" serverMark = "Minotaur Server" - serverSystemDispatcher = "system" // 系统消息分发器 + serverSystemDispatcher = "__system" // 系统消息分发器 ) const ( diff --git a/server/event.go b/server/event.go index d79b9ebe..bfd77f80 100644 --- a/server/event.go +++ b/server/event.go @@ -206,7 +206,7 @@ func (slf *event) RegConnectionClosedEvent(handler ConnectionClosedEventHandler, } func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { - slf.PushSystemMessage(func() { + slf.PushShuntMessage(conn, func() { slf.Server.online.Delete(conn.GetID()) slf.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool { value(slf.Server, conn, err) diff --git a/server/options.go b/server/options.go index d69f1476..19dc7725 100644 --- a/server/options.go +++ b/server/options.go @@ -30,24 +30,33 @@ type option struct { } type runtime struct { - deadlockDetect time.Duration // 是否开启死锁检测 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - tickerPool *timer.Pool // 定时器池 - ticker *timer.Ticker // 定时器 - tickerAutonomy bool // 定时器是否独立运行 - connTickerSize int // 连接定时器大小 - websocketReadDeadline time.Duration // websocket连接超时时间 - websocketCompression int // websocket压缩等级 - websocketWriteCompression bool // websocket写入压缩 - limitLife time.Duration // 限制最大生命周期 - packetWarnSize int // 数据包大小警告 - messageStatisticsDuration time.Duration // 消息统计时长 - messageStatisticsLimit int // 消息统计数量 - messageStatistics []*atomic.Int64 // 消息统计数量 - messageStatisticsLock *sync.RWMutex // 消息统计锁 - dispatcherBufferSize int // 消息分发器缓冲区大小 - connWriteBufferSize int // 连接写入缓冲区大小 + deadlockDetect time.Duration // 是否开启死锁检测 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + tickerPool *timer.Pool // 定时器池 + ticker *timer.Ticker // 定时器 + tickerAutonomy bool // 定时器是否独立运行 + connTickerSize int // 连接定时器大小 + websocketReadDeadline time.Duration // websocket连接超时时间 + websocketCompression int // websocket压缩等级 + websocketWriteCompression bool // websocket写入压缩 + limitLife time.Duration // 限制最大生命周期 + packetWarnSize int // 数据包大小警告 + messageStatisticsDuration time.Duration // 消息统计时长 + messageStatisticsLimit int // 消息统计数量 + messageStatistics []*atomic.Int64 // 消息统计数量 + messageStatisticsLock *sync.RWMutex // 消息统计锁 + dispatcherBufferSize int // 消息分发器缓冲区大小 + connWriteBufferSize int // 连接写入缓冲区大小 + disableAutomaticReleaseShunt bool // 是否禁用自动释放分流渠道 +} + +// WithDisableAutomaticReleaseShunt 通过禁用自动释放分流渠道的方式创建服务器 +// - 默认不开启,当禁用自动释放分流渠道时,服务器将不会在连接断开时自动释放分流渠道,需要手动调用 ReleaseShunt 方法释放 +func WithDisableAutomaticReleaseShunt() Option { + return func(srv *Server) { + srv.runtime.disableAutomaticReleaseShunt = true + } } // WithConnWriteBufferSize 通过连接写入缓冲区大小的方式创建服务器 diff --git a/server/server.go b/server/server.go index bd67c8de..b70094c2 100644 --- a/server/server.go +++ b/server/server.go @@ -561,6 +561,7 @@ func (slf *Server) GetMessageCount() int64 { // UseShunt 切换连接所使用的消息分流渠道,当分流渠道 name 不存在时将会创建一个新的分流渠道,否则将会加入已存在的分流渠道 // - 默认情况下,所有连接都使用系统通道进行消息分发,当指定消息分流渠道时,将会使用指定的消息分流渠道进行消息分发 +// - 在使用 WithDisableAutomaticReleaseShunt 创建服务器后,必须始终在连接不再使用后主动通过 ReleaseShunt 释放消息分流渠道,否则将造成内存泄漏 func (slf *Server) UseShunt(conn *Conn, name string) { slf.dispatcherLock.Lock() defer slf.dispatcherLock.Unlock() @@ -578,7 +579,7 @@ func (slf *Server) UseShunt(conn *Conn, name string) { } delete(slf.dispatcherMember[curr.name], conn.GetID()) - if len(slf.dispatcherMember[curr.name]) == 0 { + if curr.name != serverSystemDispatcher && len(slf.dispatcherMember[curr.name]) == 0 { delete(slf.dispatchers, curr.name) curr.transfer(d) curr.close() @@ -595,6 +596,32 @@ func (slf *Server) UseShunt(conn *Conn, name string) { member[conn.GetID()] = conn } +// HasShunt 检查特定消息分流渠道是否存在 +func (slf *Server) HasShunt(name string) bool { + slf.dispatcherLock.RLock() + defer slf.dispatcherLock.RUnlock() + _, exist := slf.dispatchers[name] + return exist +} + +// GetConnCurrShunt 获取连接当前所使用的消息分流渠道 +func (slf *Server) GetConnCurrShunt(conn *Conn) string { + slf.dispatcherLock.RLock() + defer slf.dispatcherLock.RUnlock() + d, exist := slf.currDispatcher[conn.GetID()] + if exist { + return d.name + } + return serverSystemDispatcher +} + +// GetShuntNum 获取消息分流渠道数量 +func (slf *Server) GetShuntNum() int { + slf.dispatcherLock.RLock() + defer slf.dispatcherLock.RUnlock() + return len(slf.dispatchers) +} + // getConnDispatcher 获取连接所使用的消息分发器 func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher { if conn == nil { @@ -609,21 +636,29 @@ func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher { return slf.systemDispatcher } +// ReleaseShunt 释放分流渠道中的连接,当分流渠道中不再存在连接时将会自动释放分流渠道 +// - 在未使用 WithDisableAutomaticReleaseShunt 选项时,当连接关闭时将会自动释放分流渠道中连接的资源占用 +// - 若执行过程中连接正在使用,将会切换至系统通道 +func (slf *Server) ReleaseShunt(conn *Conn) { + slf.releaseDispatcher(conn) +} + // releaseDispatcher 关闭消息分发器 func (slf *Server) releaseDispatcher(conn *Conn) { if conn == nil { return } + cid := conn.GetID() slf.dispatcherLock.Lock() defer slf.dispatcherLock.Unlock() - d, exist := slf.currDispatcher[conn.GetID()] - if exist { - delete(slf.dispatcherMember[d.name], conn.GetID()) + d, exist := slf.currDispatcher[cid] + if exist && d.name != serverSystemDispatcher { + delete(slf.dispatcherMember[d.name], cid) if len(slf.dispatcherMember[d.name]) == 0 { d.close() delete(slf.dispatchers, d.name) } - delete(slf.currDispatcher, conn.GetID()) + delete(slf.currDispatcher, cid) } } @@ -662,7 +697,10 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s)) } } - var fields = make([]log.Field, 0, len(message.marks)+4) + var fields = make([]log.Field, 0, len(message.marks)+5) + if message.conn != nil { + fields = append(fields, log.String("shunt", slf.GetConnCurrShunt(message.conn))) + } fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String())) fields = append(fields, message.marks...) //fields = append(fields, log.Stack("stack")) @@ -823,20 +861,20 @@ func (slf *Server) PushAsyncCallbackMessage(err error, callback func(err error), } // PushShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 -// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发 +// - 需要注意的是,当未指定 UseShunt 时,将会通过 PushAsyncMessage 进行转发 // - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) { slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...)) } // PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 -// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发 +// - 需要注意的是,当未指定 UseShunt 时,将会通过 PushAsyncCallbackMessage 进行转发 func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) { slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...)) } // PushPacketMessage 向服务器中推送 MessageTypePacket 消息 -// - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息 +// - 当存在 UseShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息 func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) { slf.pushMessage(slf.messagePool.Get().castToPacketMessage( &Conn{wst: wst, connection: conn.connection},