forked from botuniverse/go-libonebot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
comm_ws.go
100 lines (87 loc) · 2.76 KB
/
comm_ws.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package libonebot
import (
"context"
"fmt"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
type wsComm struct {
ob *OneBot
addr string
}
var wsUpgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (comm *wsComm) handle(w http.ResponseWriter, r *http.Request) {
comm.ob.Logger.Infof("收到来自 %v 的 WebSocket (%v) 连接请求", r.RemoteAddr, comm.addr)
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
comm.ob.Logger.Errorf("WebSocket (%v) 连接失败, 错误: %v", comm.addr, err)
return
}
comm.ob.Logger.Infof("WebSocket (%v) 连接成功", comm.addr)
defer conn.Close()
// protect concurrent writes to the same connection
connWriteLock := &sync.Mutex{}
eventChan := comm.ob.openEventListenChan()
defer comm.ob.closeEventListenChan(eventChan)
go func() {
// keep pushing events throught the connection
for event := range eventChan {
comm.ob.Logger.Debugf("通过 WebSocket (%v) 推送事件 `%v`", comm.addr, event.name)
connWriteLock.Lock()
conn.WriteMessage(websocket.TextMessage, event.bytes) // TODO: handle err
connWriteLock.Unlock()
}
}()
for {
// this is the only one place we read from the connection, no need to lock
messageType, messageBytes, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
comm.ob.Logger.Infof("WebSocket (%v) 连接断开", comm.addr)
} else {
comm.ob.Logger.Errorf("WebSocket (%v) 连接异常断开, 错误: %v", comm.addr, err)
}
break
}
isBinary := messageType == websocket.BinaryMessage
resp := comm.ob.parseAndHandleActionRequest(messageBytes, isBinary)
respBytes, err := resp.encode(isBinary)
if err != nil {
err := fmt.Errorf("动作响应编码失败, 错误: %v", err)
comm.ob.Logger.Warn(err)
respBytes, _ = failedResponse(RetCodeBadActionHandler, err).encode(isBinary)
}
connWriteLock.Lock()
conn.WriteMessage(messageType, respBytes) // TODO: handle err
connWriteLock.Unlock()
}
}
func commStartWS(c ConfigCommWS, ob *OneBot) commCloser {
addr := fmt.Sprintf("%s:%d", c.Host, c.Port)
ob.Logger.Infof("正在启动 WebSocket (%v)...", addr)
comm := &wsComm{ob: ob, addr: addr}
mux := http.NewServeMux()
mux.HandleFunc("/", comm.handle)
server := &http.Server{
Addr: addr,
Handler: mux,
}
go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
ob.Logger.Errorf("WebSocket (%v) 启动失败, 错误: %v", addr, err)
} else {
ob.Logger.Infof("WebSocket (%v) 已关闭", addr)
}
}()
return func() {
if err := server.Shutdown(context.TODO() /* TODO */); err != nil {
ob.Logger.Errorf("WebSocket (%v) 关闭失败, 错误: %v", addr, err)
}
// TODO: wg.Wait() 后再输出已关闭
}
}