Skip to content

Commit

Permalink
feat: server 包新增 WithLimitLift 和 WithConnectionMessageChannelSize 函数,…
Browse files Browse the repository at this point in the history
…用于限制服务器最大生命周期及连接的消息写入通道大小
  • Loading branch information
kercylan98 committed Sep 9, 2023
1 parent a938bf5 commit 064d434
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 13 deletions.
10 changes: 5 additions & 5 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, 1024*10),
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: server,
remoteAddr: session.RemoteAddr(),
ip: session.RemoteAddr().String(),
Expand All @@ -39,7 +39,7 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, 1024*10),
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: server,
remoteAddr: conn.RemoteAddr(),
ip: conn.RemoteAddr().String(),
Expand All @@ -62,7 +62,7 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, 1024*10),
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: server,
remoteAddr: ws.RemoteAddr(),
ip: ip,
Expand All @@ -82,7 +82,7 @@ func newGatewayConn(conn *Conn, connId string) *Conn {
c := &Conn{
//ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, 1024*10),
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: conn.server,
data: map[any]any{},
},
Expand All @@ -98,7 +98,7 @@ func NewEmptyConn(server *Server) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, 1024*10),
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: server,
remoteAddr: &net.TCPAddr{},
ip: "0.0.0.0:0",
Expand Down
1 change: 1 addition & 0 deletions server/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
DefaultMessageChannelSize = 1024 * 1024
DefaultAsyncPoolSize = 256
DefaultWebsocketReadDeadline = 30 * time.Second
DefaultConnectionChannelSize = 1024 * 10
)

const (
Expand Down
6 changes: 6 additions & 0 deletions server/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func (slf *event) OnStartFinishEvent() {
return true
})
}, "StartFinishEvent")
if slf.Server.limitLife > 0 {
go func() {
time.Sleep(slf.Server.limitLife)
slf.Shutdown()
}()
}
}

// RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数
Expand Down
22 changes: 22 additions & 0 deletions server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,28 @@ type runtime struct {
websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
connMessageChannelSize int // 连接消息通道大小
}

// WithConnMessageChannelSize 通过指定连接消息通道大小的方式创建服务器
// - 足够大的消息通道可以确保连接在写入消息时不至于阻塞
// - 默认值为 DefaultConnectionChannelSize
func WithConnMessageChannelSize(size int) Option {
return func(srv *Server) {
if size <= 0 {
size = DefaultConnectionChannelSize
}
srv.connMessageChannelSize = size
}
}

// WithLimitLife 通过限制最大生命周期的方式创建服务器
// - 通常用于测试服务器,服务器将在到达最大生命周期时自动关闭
func WithLimitLife(t time.Duration) Option {
return func(srv *Server) {
srv.limitLife = t
}
}

// WithWebsocketWriteCompression 通过数据写入压缩的方式创建Websocket服务器
Expand Down
6 changes: 5 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ import (
// New 根据特定网络类型创建一个服务器
func New(network Network, options ...Option) *Server {
server := &Server{
runtime: &runtime{messagePoolSize: DefaultMessageBufferSize, messageChannelSize: DefaultMessageChannelSize},
runtime: &runtime{
messagePoolSize: DefaultMessageBufferSize,
messageChannelSize: DefaultMessageChannelSize,
connMessageChannelSize: DefaultConnectionChannelSize,
},
option: &option{},
network: network,
online: concurrent.NewBalanceMap[string, *Conn](),
Expand Down
18 changes: 12 additions & 6 deletions server/server_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@ import (
)

func ExampleNew() {
srv := server.New(server.NetworkWebsocket,
server.WithDeadlockDetect(time.Second*5),
server.WithPProf("/debug/pprof"),
)

srv := server.New(server.NetworkWebsocket, server.WithLimitLife(time.Millisecond))
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
conn.Write(packet)
})
if err := srv.Run(":9999"); err != nil {
panic(err)
}

// Output:
}

go func() { time.Sleep(1 * time.Second); srv.Shutdown() }()
func ExampleServer_Run() {
srv := server.New(server.NetworkWebsocket, server.WithLimitLife(time.Millisecond))
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
conn.Write(packet)
})
if err := srv.Run(":9999"); err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestNewClient(t *testing.T) {
for i := 0; i < 1000; i++ {
id := i
fmt.Println("启动", i+1)
cli := client.NewWebsocket("ws://127.0.0.1:9999")
cli := client.NewWebsocket("ws://127.0.0.1:8888")
cli.RegConnectionReceivePacketEvent(func(conn *client.Client, wst int, packet []byte) {
fmt.Println("收到", id+1, string(packet))
})
Expand Down

0 comments on commit 064d434

Please sign in to comment.