Skip to content

Commit

Permalink
fix: bug
Browse files Browse the repository at this point in the history
  • Loading branch information
mizy committed Aug 16, 2023
1 parent e2b0570 commit f70d78a
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 26 deletions.
69 changes: 68 additions & 1 deletion packages/analytics/pkg/clients/clients.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,76 @@
package clients

import "github.com/gorilla/websocket"
import (
"fmt"
"sync"

"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)

var mu sync.RWMutex

var WsClients map[string]*websocket.Conn = make(map[string]*websocket.Conn)
var PendingMsgMap map[string][]any = make(map[string][]any)

func GetClientByHost(host string) *websocket.Conn {
mu.RLock()
defer mu.RUnlock()
return WsClients[host]
}

func AddClientByHost(host string, conn *websocket.Conn) {
mu.Lock()
defer mu.Unlock()
WsClients[host] = conn
if PendingMsgMap[host] != nil {
for _, msg := range PendingMsgMap[host] {
SendJsonMessage(host, msg)
}
delete(PendingMsgMap, host)
}
}

func SendJsonMessage(host string, message any) error {
defer func() {
if err := recover(); err != nil {
AppendPendingMsg(host, message)
logrus.Errorf("send msg to explorer error: %v", err)
}
}()
err := func() error {
client := GetClientByHost(host)
if client != nil {
return client.WriteJSON(message)
}

return fmt.Errorf("send msg to explorer error: %v", "conn is nil")
}()
if err != nil {
AppendPendingMsg(host, message)
logrus.Error(err)
}
return err
}

func AppendPendingMsg(host string, message any) {
mu.Lock()
defer mu.Unlock()
PendingMsgMap[host] = append(PendingMsgMap[host], message)
}

func Clear() {
mu.Lock()
defer mu.Unlock()
for _, conn := range WsClients {
conn.Close()
}
WsClients = make(map[string]*websocket.Conn)
PendingMsgMap = make(map[string][]any)
}

func DeleteClientByHost(host string) {
mu.Lock()
defer mu.Unlock()
delete(WsClients, host)
}
6 changes: 3 additions & 3 deletions packages/analytics/pkg/service/task/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func (t *TaskService) SendLogToExplorer(text string) {
if testing.Verbose() {
log.Print("------->slice log:", text)
}
conn := clients.GetClientByHost(t.host)
conn.WriteJSON(types.Ws_Message{
msg := types.Ws_Message{
Header: types.Ws_Message_Header{
MsgId: t.msgId,
SendTime: time.Now().UnixMilli(),
Expand All @@ -104,7 +103,8 @@ func (t *TaskService) SendLogToExplorer(text string) {
},
},
},
})
}
clients.SendJsonMessage(t.host, msg)
}

func GetSomeLinesLogWithPath(path string) ([]string, error) {
Expand Down
9 changes: 3 additions & 6 deletions packages/analytics/pkg/service/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ func (t *TaskService) SendTaskStatusToExplorer() {
"endTime": time.Now().UnixMilli(),
},
}
logrus.Info("send task status to explorer: ", t.task.JobId, "_", t.task.TaskId, " status: ", t.task.Status)
conn := clients.GetClientByHost(t.host)
err := conn.WriteJSON(types.Ws_Message{
msg := types.Ws_Message{
Header: types.Ws_Message_Header{
SendTime: time.Now().UnixMilli(),
MsgId: t.msgId,
Expand All @@ -132,10 +130,9 @@ func (t *TaskService) SendTaskStatusToExplorer() {
MsgType: types.Ws_Message_Type_Task,
Content: content,
},
})
if err != nil {
logrus.Errorf("send task status to explorer error: %v", err)
}
logrus.Info("send task status to explorer: ", t.task.JobId, "_", t.task.TaskId, " status: ", t.task.Status)
clients.SendJsonMessage(t.host, msg)
}

func task2cmd(task *TaskInfo, filterPwd bool) string {
Expand Down
24 changes: 8 additions & 16 deletions packages/analytics/pkg/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ws
import (
"encoding/json"
"net/http"
"sync"
"time"

"github.com/gorilla/websocket"
Expand All @@ -16,10 +15,7 @@ import (
agentConfig "github.com/vesoft-inc/nebula-agent/v3/pkg/config"
)

var mu sync.Mutex

func InitWsConnect() {
clients.WsClients = make(map[string]*websocket.Conn)
for _, host := range config.C.ExplorerHosts {
reconnect(host)
}
Expand All @@ -29,18 +25,11 @@ func InitWsConnect() {

func CloseWsConnect() {
StopHeartBeat()
mu.Lock()
for _, conn := range clients.WsClients {
conn.Close()
}
clients.WsClients = nil
mu.Unlock()
clients.Clear()
}

func reconnect(host string) {
mu.Lock()
delete(clients.WsClients, host)
mu.Unlock()
clients.DeleteClientByHost(host)
err := connect(host)
if err == nil {
logrus.Info("connect success:", host)
Expand Down Expand Up @@ -72,9 +61,7 @@ func connect(host string) error {
logrus.Errorf("connect to %s error: %v", host, err)
return err
}
mu.Lock()
clients.WsClients[host] = conn
mu.Unlock()
clients.AddClientByHost(host, conn)
return nil
}

Expand All @@ -98,6 +85,11 @@ func listen(host string) {
}

func switchRoute(res *types.Ws_Message, host string) {
defer func() {
if r := recover(); r != nil {
logrus.Error("Recovered from panic:", r)
}
}()
switch res.Body.MsgType {
case types.Ws_Message_Type_Task:
go task.HandleAnalyticsTask(res, host)
Expand Down

0 comments on commit f70d78a

Please sign in to comment.