Skip to content

Commit

Permalink
perf: 优化 server.Server 连接管理机制,优化 GetOnlineCount、GetOnlineBotCount 性能
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Dec 29, 2023
1 parent f8d8d37 commit 5e5fe8a
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 51 deletions.
1 change: 1 addition & 0 deletions server/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB
DefaultDispatcherBufferSize = 1024 * 16
DefaultConnWriteBufferSize = 1024 * 1
DefaultConnHubBufferSize = 1024 * 1
)

func DefaultWebsocketUpgrader() *websocket.Upgrader {
Expand Down
4 changes: 2 additions & 2 deletions server/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (slf *event) RegConnectionClosedEvent(handler ConnectionClosedEventHandler,

func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
slf.PushShuntMessage(conn, func() {
slf.Server.online.Del(conn.GetID())
slf.unregisterConn(conn.GetID())
slf.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool {
value(slf.Server, conn, err)
return true
Expand All @@ -231,7 +231,7 @@ func (slf *event) RegConnectionOpenedEvent(handler ConnectionOpenedEventHandler,

func (slf *event) OnConnectionOpenedEvent(conn *Conn) {
slf.PushSystemMessage(func() {
slf.Server.online.Set(conn.GetID(), conn)
slf.registerConn(conn)
slf.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool {
value(slf.Server, conn)
return true
Expand Down
153 changes: 153 additions & 0 deletions server/hub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package server

import (
"context"
"github.com/kercylan98/minotaur/utils/hash"
"sync"
)

type hub struct {
connections map[string]*Conn // 所有连接

register chan *Conn // 注册连接
unregister chan string // 注销连接
broadcast chan hubBroadcast // 广播消息

botCount int // 机器人数量
onlineCount int // 在线人数

chanMutex sync.RWMutex // 避免外界函数导致的并发问题
}

type hubBroadcast struct {
packet []byte // 广播的数据包
filter func(conn *Conn) bool // 过滤掉返回 false 的连接
}

func (h *hub) run(ctx context.Context) {
h.connections = make(map[string]*Conn)
h.register = make(chan *Conn, DefaultConnHubBufferSize)
h.unregister = make(chan string, DefaultConnHubBufferSize)
h.broadcast = make(chan hubBroadcast, DefaultConnHubBufferSize)
go func(ctx context.Context, h *hub) {
for {
select {
case conn := <-h.register:
h.chanMutex.Lock()
h.connections[conn.GetID()] = conn
h.onlineCount++
if conn.IsBot() {
h.botCount++
}
h.chanMutex.Unlock()
case connId := <-h.unregister:
h.chanMutex.Lock()
if conn, ok := h.connections[connId]; ok {
h.onlineCount--
delete(h.connections, conn.GetID())
if conn.IsBot() {
h.botCount--
}
}
h.chanMutex.Unlock()
case packet := <-h.broadcast:
h.chanMutex.RLock()
for _, conn := range h.connections {
if packet.filter != nil && !packet.filter(conn) {
continue
}
conn.Write(packet.packet)
}

case <-ctx.Done():
h.chanMutex.Lock()
close(h.register)
close(h.unregister)
h.connections = nil
h.botCount = 0
h.onlineCount = 0
h.chanMutex.Unlock()
return

}
}
}(ctx, h)
}

// registerConn 注册连接
func (h *hub) registerConn(conn *Conn) {
select {
case h.register <- conn:
default:
}
}

// unregisterConn 注销连接
func (h *hub) unregisterConn(id string) {
select {
case h.unregister <- id:
default:
}
}

// GetOnlineCount 获取在线人数
func (h *hub) GetOnlineCount() int {
h.chanMutex.RLock()
defer h.chanMutex.RUnlock()
return h.onlineCount
}

// GetOnlineBotCount 获取在线机器人数量
func (h *hub) GetOnlineBotCount() int {
h.chanMutex.RLock()
defer h.chanMutex.RUnlock()
return h.botCount
}

// IsOnline 是否在线
func (h *hub) IsOnline(id string) bool {
h.chanMutex.RLock()
_, exist := h.connections[id]
h.chanMutex.RUnlock()
return exist
}

// GetOnlineAll 获取所有在线连接
func (h *hub) GetOnlineAll() map[string]*Conn {
h.chanMutex.RLock()
cop := hash.Copy(h.connections)
h.chanMutex.RUnlock()
return cop
}

// GetOnline 获取在线连接
func (h *hub) GetOnline(id string) *Conn {
h.chanMutex.RLock()
conn := h.connections[id]
h.chanMutex.RUnlock()
return conn
}

// CloseConn 关闭连接
func (h *hub) CloseConn(id string) {
h.chanMutex.RLock()
conn := h.connections[id]
h.chanMutex.RUnlock()
if conn != nil {
conn.Close()
}
}

// Broadcast 广播消息
func (h *hub) Broadcast(packet []byte, filter ...func(conn *Conn) bool) {
m := hubBroadcast{
packet: packet,
}
if len(filter) > 0 {
m.filter = filter[0]
}
select {
case h.broadcast <- m:
default:
}
}
52 changes: 3 additions & 49 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/alphadose/haxmap"
"github.com/gin-gonic/gin"
"github.com/kercylan98/minotaur/server/internal/logger"
"github.com/kercylan98/minotaur/utils/concurrent"
Expand Down Expand Up @@ -36,9 +35,9 @@ func New(network Network, options ...Option) *Server {
dispatcherBufferSize: DefaultDispatcherBufferSize,
connWriteBufferSize: DefaultConnWriteBufferSize,
},
hub: &hub{},
option: &option{},
network: network,
online: haxmap.New[string, *Conn](),
closeChannel: make(chan struct{}, 1),
systemSignal: make(chan os.Signal, 1),
dispatchers: make(map[string]*dispatcher),
Expand Down Expand Up @@ -71,6 +70,7 @@ type Server struct {
*event // 事件
*runtime // 运行时
*option // 可选项
*hub // 连接集合
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
Expand All @@ -80,7 +80,6 @@ type Server struct {
messagePool *concurrent.Pool[*Message] // 消息池
ctx context.Context // 上下文
cancel context.CancelFunc // 停止上下文
online *haxmap.Map[string, *Conn] // 在线连接
systemDispatcher *dispatcher // 系统消息分发器
systemSignal chan os.Signal // 系统信号
closeChannel chan struct{} // 关闭信号
Expand All @@ -106,6 +105,7 @@ func (srv *Server) preCheckAndAdaptation(addr string) (startState <-chan error,
kcp.SystemTimedSched.Close()
}

srv.hub.run(srv.ctx)
return srv.network.adaptation(srv), nil
}

Expand Down Expand Up @@ -174,52 +174,6 @@ func (srv *Server) TimeoutContext(timeout time.Duration) (context.Context, conte
return context.WithTimeout(srv.ctx, timeout)
}

// GetOnlineCount 获取在线人数
func (srv *Server) GetOnlineCount() int {
return int(srv.online.Len())
}

// GetOnlineBotCount 获取在线机器人数量
func (srv *Server) GetOnlineBotCount() int {
var count int
srv.online.ForEach(func(id string, conn *Conn) bool {
if conn.IsBot() {
count++
}
return true
})
return count
}

// GetOnline 获取在线连接
func (srv *Server) GetOnline(id string) *Conn {
c, _ := srv.online.Get(id)
return c
}

// GetOnlineAll 获取所有在线连接
func (srv *Server) GetOnlineAll() map[string]*Conn {
var m = map[string]*Conn{}
srv.online.ForEach(func(id string, conn *Conn) bool {
m[id] = conn
return true
})
return m
}

// IsOnline 是否在线
func (srv *Server) IsOnline(id string) bool {
_, exist := srv.online.Get(id)
return exist
}

// CloseConn 关闭连接
func (srv *Server) CloseConn(id string) {
if conn, exist := srv.online.Get(id); exist {
conn.Close()
}
}

// Ticker 获取服务器定时器
func (srv *Server) Ticker() *timer.Ticker {
if srv.ticker == nil {
Expand Down

0 comments on commit 5e5fe8a

Please sign in to comment.