Skip to content

Commit

Permalink
feat: server 包新增 WithDisableAutomaticReleaseShunt 可选项,可禁止分流渠道自动释放。增加 …
Browse files Browse the repository at this point in the history
…ReleaseShunt、HasShunt、GetShuntNum 等函数。优化系统分流渠道将不再能够被释放
  • Loading branch information
kercylan98 committed Dec 25, 2023
1 parent ceffa2e commit d9ef347
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 30 deletions.
6 changes: 5 additions & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,11 @@ func (slf *Conn) Close(err ...error) {
if slf.ticker != nil {
slf.ticker.Release()
}
slf.server.releaseDispatcher(slf)
if !slf.server.runtime.disableAutomaticReleaseShunt {
slf.server.releaseDispatcher(slf)
} else {

}
slf.loop.Close()
slf.mu.Unlock()
if len(err) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
const (
serverMultipleMark = "Minotaur Multiple Server"
serverMark = "Minotaur Server"
serverSystemDispatcher = "system" // 系统消息分发器
serverSystemDispatcher = "__system" // 系统消息分发器
)

const (
Expand Down
2 changes: 1 addition & 1 deletion server/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (slf *event) RegConnectionClosedEvent(handler ConnectionClosedEventHandler,
}

func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
slf.PushSystemMessage(func() {
slf.PushShuntMessage(conn, func() {
slf.Server.online.Delete(conn.GetID())
slf.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool {
value(slf.Server, conn, err)
Expand Down
45 changes: 27 additions & 18 deletions server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,33 @@ type option struct {
}

type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
tickerPool *timer.Pool // 定时器池
ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小
websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
packetWarnSize int // 数据包大小警告
messageStatisticsDuration time.Duration // 消息统计时长
messageStatisticsLimit int // 消息统计数量
messageStatistics []*atomic.Int64 // 消息统计数量
messageStatisticsLock *sync.RWMutex // 消息统计锁
dispatcherBufferSize int // 消息分发器缓冲区大小
connWriteBufferSize int // 连接写入缓冲区大小
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
tickerPool *timer.Pool // 定时器池
ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小
websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
packetWarnSize int // 数据包大小警告
messageStatisticsDuration time.Duration // 消息统计时长
messageStatisticsLimit int // 消息统计数量
messageStatistics []*atomic.Int64 // 消息统计数量
messageStatisticsLock *sync.RWMutex // 消息统计锁
dispatcherBufferSize int // 消息分发器缓冲区大小
connWriteBufferSize int // 连接写入缓冲区大小
disableAutomaticReleaseShunt bool // 是否禁用自动释放分流渠道
}

// WithDisableAutomaticReleaseShunt 通过禁用自动释放分流渠道的方式创建服务器
// - 默认不开启,当禁用自动释放分流渠道时,服务器将不会在连接断开时自动释放分流渠道,需要手动调用 ReleaseShunt 方法释放
func WithDisableAutomaticReleaseShunt() Option {
return func(srv *Server) {
srv.runtime.disableAutomaticReleaseShunt = true
}
}

// WithConnWriteBufferSize 通过连接写入缓冲区大小的方式创建服务器
Expand Down
56 changes: 47 additions & 9 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ func (slf *Server) GetMessageCount() int64 {

// UseShunt 切换连接所使用的消息分流渠道,当分流渠道 name 不存在时将会创建一个新的分流渠道,否则将会加入已存在的分流渠道
// - 默认情况下,所有连接都使用系统通道进行消息分发,当指定消息分流渠道时,将会使用指定的消息分流渠道进行消息分发
// - 在使用 WithDisableAutomaticReleaseShunt 创建服务器后,必须始终在连接不再使用后主动通过 ReleaseShunt 释放消息分流渠道,否则将造成内存泄漏
func (slf *Server) UseShunt(conn *Conn, name string) {
slf.dispatcherLock.Lock()
defer slf.dispatcherLock.Unlock()
Expand All @@ -578,7 +579,7 @@ func (slf *Server) UseShunt(conn *Conn, name string) {
}

delete(slf.dispatcherMember[curr.name], conn.GetID())
if len(slf.dispatcherMember[curr.name]) == 0 {
if curr.name != serverSystemDispatcher && len(slf.dispatcherMember[curr.name]) == 0 {
delete(slf.dispatchers, curr.name)
curr.transfer(d)
curr.close()
Expand All @@ -595,6 +596,32 @@ func (slf *Server) UseShunt(conn *Conn, name string) {
member[conn.GetID()] = conn
}

// HasShunt 检查特定消息分流渠道是否存在
func (slf *Server) HasShunt(name string) bool {
slf.dispatcherLock.RLock()
defer slf.dispatcherLock.RUnlock()
_, exist := slf.dispatchers[name]
return exist
}

// GetConnCurrShunt 获取连接当前所使用的消息分流渠道
func (slf *Server) GetConnCurrShunt(conn *Conn) string {
slf.dispatcherLock.RLock()
defer slf.dispatcherLock.RUnlock()
d, exist := slf.currDispatcher[conn.GetID()]
if exist {
return d.name
}
return serverSystemDispatcher
}

// GetShuntNum 获取消息分流渠道数量
func (slf *Server) GetShuntNum() int {
slf.dispatcherLock.RLock()
defer slf.dispatcherLock.RUnlock()
return len(slf.dispatchers)
}

// getConnDispatcher 获取连接所使用的消息分发器
func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher {
if conn == nil {
Expand All @@ -609,21 +636,29 @@ func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher {
return slf.systemDispatcher
}

// ReleaseShunt 释放分流渠道中的连接,当分流渠道中不再存在连接时将会自动释放分流渠道
// - 在未使用 WithDisableAutomaticReleaseShunt 选项时,当连接关闭时将会自动释放分流渠道中连接的资源占用
// - 若执行过程中连接正在使用,将会切换至系统通道
func (slf *Server) ReleaseShunt(conn *Conn) {
slf.releaseDispatcher(conn)
}

// releaseDispatcher 关闭消息分发器
func (slf *Server) releaseDispatcher(conn *Conn) {
if conn == nil {
return
}
cid := conn.GetID()
slf.dispatcherLock.Lock()
defer slf.dispatcherLock.Unlock()
d, exist := slf.currDispatcher[conn.GetID()]
if exist {
delete(slf.dispatcherMember[d.name], conn.GetID())
d, exist := slf.currDispatcher[cid]
if exist && d.name != serverSystemDispatcher {
delete(slf.dispatcherMember[d.name], cid)
if len(slf.dispatcherMember[d.name]) == 0 {
d.close()
delete(slf.dispatchers, d.name)
}
delete(slf.currDispatcher, conn.GetID())
delete(slf.currDispatcher, cid)
}
}

Expand Down Expand Up @@ -662,7 +697,10 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration
message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s))
}
}
var fields = make([]log.Field, 0, len(message.marks)+4)
var fields = make([]log.Field, 0, len(message.marks)+5)
if message.conn != nil {
fields = append(fields, log.String("shunt", slf.GetConnCurrShunt(message.conn)))
}
fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String()))
fields = append(fields, message.marks...)
//fields = append(fields, log.Stack("stack"))
Expand Down Expand Up @@ -823,20 +861,20 @@ func (slf *Server) PushAsyncCallbackMessage(err error, callback func(err error),
}

// PushShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发
// - 需要注意的是,当未指定 UseShunt 时,将会通过 PushAsyncMessage 进行转发
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...))
}

// PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发
// - 需要注意的是,当未指定 UseShunt 时,将会通过 PushAsyncCallbackMessage 进行转发
func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...))
}

// PushPacketMessage 向服务器中推送 MessageTypePacket 消息
// - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息
// - 当存在 UseShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息
func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToPacketMessage(
&Conn{wst: wst, connection: conn.connection},
Expand Down

0 comments on commit d9ef347

Please sign in to comment.