Skip to content

Commit

Permalink
fix memory leak by enabling heartbeat to release stale connections (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
windycrypto authored Aug 21, 2023
1 parent ae388da commit 35fa4f7
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 62 deletions.
4 changes: 3 additions & 1 deletion example/example-config.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
redis_config:
server_addr: "redis:6379"
server_addr: 127.0.0.1:6379
metric_config:
enabled: true
2 changes: 1 addition & 1 deletion log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func ProductionModeWithoutStackTrace() {
config := zap.NewProductionConfig()
config.EncoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
config.DisableStacktrace = true
config.OutputPaths = append(config.OutputPaths, "stdout")
//config.OutputPaths = append(config.OutputPaths)

buildLoggerWithConfig(config)
}
Expand Down
50 changes: 40 additions & 10 deletions relay/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/RabbyHub/derelay/log"
Expand All @@ -18,12 +19,13 @@ type client struct {
conn *websocket.Conn
ws *WsServer

id string // randomly generate, just for logging
active bool // heartbeat related
role RoleType // dapp or wallet
session string // session id
pubTopics *TopicSet
subTopics *TopicSet
id string // randomly generate, just for logging
active bool // heartbeat related
terminated atomic.Bool //
role RoleType // dapp or wallet
session string // session id
pubTopics *TopicSet
subTopics *TopicSet

sendbuf chan SocketMessage // send buffer
ping chan struct{}
Expand All @@ -41,13 +43,30 @@ func (c *client) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
return nil
}

func (c *client) heartbeat() {

c.conn.SetPongHandler(func(appData string) error {
c.active = true
return nil
})

for {
if !c.active {
c.terminate(fmt.Errorf("heartbeat fail"))
return
}
c.active = false

_ = c.conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(5*time.Second))
<-time.After(10 * time.Second)
}
}

func (c *client) read() {
for {
_, m, err := c.conn.ReadMessage()
if err != nil {
c.quit <- struct{}{}
c.conn.Close()
c.ws.unregister <- ClientUnregisterEvent{client: c, reason: err}
c.terminate(err)
return
}

Expand Down Expand Up @@ -81,6 +100,8 @@ func (c *client) write() {
err := c.conn.WriteMessage(websocket.TextMessage, m.Bytes())
if err != nil {
log.Error("client write error", err, zap.Any("client", c), zap.Any("message", message))
c.terminate(err)
return
}
case _, more := <-c.ping:
if !more {
Expand All @@ -101,6 +122,15 @@ func (c *client) send(message SocketMessage) {
case c.sendbuf <- message:
default:
metrics.IncSendBlocking(len(c.sendbuf))
log.Error("sending to client blocked", fmt.Errorf("sendbuf full"), zap.Any("client", c), zap.Any("len(sendbuf)", len(c.sendbuf)))
log.Error("sending to client blocked", fmt.Errorf("sendbuf full"), zap.Any("client", c), zap.Any("len(sendbuf)", len(c.sendbuf)), zap.Any("message", message))
}
}

func (c *client) terminate(reason error) {
if c.terminated.CompareAndSwap(false, true) {
c.active = false
c.quit <- struct{}{}
c.conn.Close()
c.ws.unregister <- ClientUnregisterEvent{client: c, reason: reason}
}
}
58 changes: 8 additions & 50 deletions relay/wsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package relay
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/RabbyHub/derelay/config"
"github.com/RabbyHub/derelay/log"
Expand Down Expand Up @@ -79,20 +77,16 @@ func (ws *WsServer) NewClientConn(w http.ResponseWriter, r *http.Request) {
ws: ws,
pubTopics: NewTopicSet(),
subTopics: NewTopicSet(),
sendbuf: make(chan SocketMessage, 256),
sendbuf: make(chan SocketMessage, 8),
ping: make(chan struct{}, 8),
quit: make(chan struct{}),
}

conn.SetPongHandler(func(appData string) error {
client.active = true
return nil
})

ws.register <- client

go client.read()
go client.write()
go client.heartbeat()
}

func (ws *WsServer) Run() {
Expand All @@ -119,10 +113,6 @@ func (ws *WsServer) Run() {
go ws.subMessage(message)
log.Info("local message", zap.Any("client", message.client), zap.Any("message", message))
case Ping:
//if rand.Intn(10000) == 0 {
// we need ping message to help us debug, but it's too many so we reduce the log a bit
//log.Info("local message", zap.Any("client", message.client), zap.Any("message", message))
//}
ws.handlePingMessage(message)
}
case chmessage := <-remoteCh:
Expand Down Expand Up @@ -163,17 +153,18 @@ func (ws *WsServer) Run() {
case client := <-ws.register:
log.Info("new client connection", zap.Any("client", client))
metrics.IncNewConnection()
metrics.SetCurrentConnections(len(ws.clients))
ws.clients[client] = struct{}{}
metrics.SetCurrentConnections(len(ws.clients))

case unregisterEvent := <-ws.unregister:
metrics.IncClosedConnection()
metrics.SetCurrentConnections(len(ws.clients))
client, reason := unregisterEvent.client, unregisterEvent.reason
log.Info("client disconnected", zap.Any("client", client), zap.String("reason", reason.Error()))

delete(ws.clients, client)
ws.handleClientDisconnect(client)
delete(ws.clients, client)

metrics.IncClosedConnection()
metrics.SetCurrentConnections(len(ws.clients))
log.Info("client disconnected", zap.Any("client", client), zap.String("reason", reason.Error()))
}
}
}
Expand Down Expand Up @@ -230,38 +221,5 @@ func (ws *WsServer) getCachedMessages(topic string, clear bool) []SocketMessage
return notifications
}

func (ws *WsServer) handleHeartbeat() {
for c := range ws.clients {
if !c.active {
ws.unregister <- ClientUnregisterEvent{client: c, reason: fmt.Errorf("heartbeat fail")}
continue
}
c.active = false
// ping
c.ping <- struct{}{}
}
}

func (ws *WsServer) checkSessionExpiration() {
now := time.Now()
for {
session := ws.pendingSessions.peak()
if session == nil || session.expireTime.After(now) {
break
}

metrics.IncExpiredSessions()
log.Info("[wsserver] session expired, notify dapp", zap.String("topic", session.topic))
session.dapp.send(SocketMessage{
Topic: session.topic,
Type: Pub,
Role: string(Relay),
Phase: string(SessionExpired),
})

ws.pendingSessions.pop()
}
}

func (ws *WsServer) Shutdown() {
}

0 comments on commit 35fa4f7

Please sign in to comment.