Skip to content

Commit

Permalink
fix: client 包内存溢出、死循环等问题处理
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Aug 24, 2023
1 parent 75a8608 commit 08559d8
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 16 deletions.
32 changes: 23 additions & 9 deletions server/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ type Client struct {
}

func (slf *Client) Run() error {
var wait = new(sync.WaitGroup)
wait.Add(1)
go slf.writeLoop(wait)
wait.Wait()
var runState = make(chan error)
go func() {
defer func() {
Expand All @@ -42,9 +38,20 @@ func (slf *Client) Run() error {
}()
err := <-runState
if err != nil {
slf.Close()
slf.mutex.Lock()
if slf.packetPool != nil {
slf.packetPool.Close()
slf.packetPool = nil
}
slf.accumulate = append(slf.accumulate, slf.packets...)
slf.packets = nil
slf.mutex.Unlock()
return err
}
var wait = new(sync.WaitGroup)
wait.Add(1)
go slf.writeLoop(wait)
wait.Wait()
slf.OnConnectionOpenedEvent(slf)
return nil
}
Expand Down Expand Up @@ -83,16 +90,23 @@ func (slf *Client) Write(packet []byte, callback ...func(err error)) {
// write 向连接中写入数据
// - messageType: websocket模式中指定消息类型
func (slf *Client) write(wst int, packet []byte, callback ...func(err error)) {
if slf.packetPool == nil {
var p = &Packet{
wst: wst,
data: packet,
}
if len(callback) > 0 {
p.callback = callback[0]
}
slf.accumulate = append(slf.accumulate, p)
return
}
cp := slf.packetPool.Get()
cp.wst = wst
cp.data = packet
if len(callback) > 0 {
cp.callback = callback[0]
}
if slf.packetPool == nil {
slf.accumulate = append(slf.accumulate, cp)
return
}
slf.mutex.Lock()
slf.packets = append(slf.packets, cp)
slf.mutex.Unlock()
Expand Down
1 change: 1 addition & 0 deletions server/client/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (slf *Websocket) Run(runState chan<- error, receive func(wst int, packet []
return
}
slf.conn = ws
slf.clsoed = false
runState <- nil
for !slf.clsoed {
messageType, packet, readErr := ws.ReadMessage()
Expand Down
8 changes: 6 additions & 2 deletions server/gateway/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (slf *Endpoint) Connect() {
slf.state = slf.evaluator(float64(time.Now().UnixNano() - cur))
break
}
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
}
}

Expand Down Expand Up @@ -98,5 +98,9 @@ func (slf *Endpoint) onConnectionReceivePacket(conn *client.Client, wst int, pac
panic(err)
}
slf.state = slf.evaluator(float64(time.Now().UnixNano() - sendTime))
slf.GetLink(addr).SetWST(wst).Write(packet)
cli := slf.GetLink(addr)
if cli == nil {
return
}
cli.SetWST(wst).Write(packet)
}
5 changes: 0 additions & 5 deletions server/gateway/gateway_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gateway_test

import (
"fmt"
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/server/client"
"github.com/kercylan98/minotaur/server/gateway"
Expand All @@ -11,9 +10,6 @@ import (

func TestGateway_RunEndpointServer(t *testing.T) {
srv := server.New(server.NetworkWebsocket, server.WithDeadlockDetect(time.Second*3))
srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) {
fmt.Println(err)
})
srv.RegConnectionPacketPreprocessEvent(func(srv *server.Server, conn *server.Conn, packet []byte, abort func(), usePacket func(newPacket []byte)) {
addr, packet, err := gateway.UnmarshalGatewayOutPacket(packet)
if err != nil {
Expand All @@ -35,7 +31,6 @@ func TestGateway_RunEndpointServer(t *testing.T) {
return packet
})
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
fmt.Println("endpoint receive packet", string(packet))
conn.Write(packet)
})
if err := srv.Run(":8889"); err != nil {
Expand Down

0 comments on commit 08559d8

Please sign in to comment.