From bc31ac7981d12c1e4e5ff8f19ec282ffa3d8d4cd Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Fri, 28 May 2021 19:15:12 -0400 Subject: [PATCH] fix(dot/telemetry): refactor telemetry to reduce CPU usage (#1597) --- dot/network/service.go | 29 +++-- dot/node.go | 24 ++-- dot/sync/syncer.go | 9 +- dot/telemetry/telemetry.go | 200 ++++++++++++++------------------ dot/telemetry/telemetry_test.go | 79 +++++++++---- 5 files changed, 181 insertions(+), 160 deletions(-) diff --git a/dot/network/service.go b/dot/network/service.go index 95c35ebca4..5c86e153af 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -313,9 +313,15 @@ main: case <-ticker.C: o := s.host.bwc.GetBandwidthTotals() - telemetry.GetInstance().SendNetworkData(telemetry.NewNetworkData(s.host.peerCount(), o.RateIn, o.RateOut)) + err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( + telemetry.NewKeyValue("bandwidth_download", o.RateIn), + telemetry.NewKeyValue("bandwidth_upload", o.RateOut), + telemetry.NewKeyValue("msg", "system.interval"), + telemetry.NewKeyValue("peers", s.host.peerCount()))) + if err != nil { + logger.Debug("problem sending system.interval telemetry message", "error", err) + } } - } } @@ -330,14 +336,17 @@ func (s *Service) sentBlockIntervalTelemetry() { continue } - telemetry.GetInstance().SendBlockIntervalData(&telemetry.BlockIntervalData{ - BestHash: best.Hash(), - BestHeight: best.Number, - FinalizedHash: finalized.Hash(), - FinalizedHeight: finalized.Number, - TXCount: 0, // todo (ed) determine where to get tx count - UsedStateCacheSize: 0, // todo (ed) determine where to get used_state_cache_size - }) + err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( + telemetry.NewKeyValue("best", best.Hash().String()), + telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint + telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint + telemetry.NewKeyValue("height", best.Number), + telemetry.NewKeyValue("msg", "system.interval"), + telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count + telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size + if err != nil { + logger.Debug("problem sending system.interval telemetry message", "error", err) + } time.Sleep(s.telemetryInterval) } } diff --git a/dot/node.go b/dot/node.go index 59444d47e5..4b535d6adb 100644 --- a/dot/node.go +++ b/dot/node.go @@ -350,18 +350,20 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, } telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints) - data := &telemetry.ConnectionData{ - Authority: cfg.Core.GrandpaAuthority, - Chain: sysSrvc.ChainName(), - GenesisHash: stateSrvc.Block.GenesisHash().String(), - SystemName: sysSrvc.SystemName(), - NodeName: cfg.Global.Name, - SystemVersion: sysSrvc.SystemVersion(), - NetworkID: networkSrvc.NetworkState().PeerID, - StartTime: strconv.FormatInt(time.Now().UnixNano(), 10), - } - telemetry.GetInstance().SendConnection(data) + err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( + telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority), + telemetry.NewKeyValue("chain", sysSrvc.ChainName()), + telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()), + telemetry.NewKeyValue("implementation", sysSrvc.SystemName()), + telemetry.NewKeyValue("msg", "system.connected"), + telemetry.NewKeyValue("name", cfg.Global.Name), + telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID), + telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)), + telemetry.NewKeyValue("version", sysSrvc.SystemVersion()))) + if err != nil { + logger.Debug("problem sending system.connected telemetry message", "err", err) + } return node, nil } diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index 88497b49a4..e88f4a2fd8 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -364,7 +364,14 @@ func (s *Service) handleBlock(block *types.Block) error { } } else { logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash()) - telemetry.GetInstance().SendBlockImport(block.Header.Hash().String(), block.Header.Number) + err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( + telemetry.NewKeyValue("best", block.Header.Hash().String()), + telemetry.NewKeyValue("height", block.Header.Number.Uint64()), + telemetry.NewKeyValue("msg", "block.import"), + telemetry.NewKeyValue("origin", "NetworkInitialSync"))) + if err != nil { + logger.Debug("problem sending block.import telemetry message", "error", err) + } } // handle consensus digest for authority changes diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 8e94411cac..cd11d65207 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -17,38 +17,38 @@ package telemetry import ( - "bytes" "encoding/json" - "fmt" - "math/big" + "errors" "sync" "time" - "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" + log "github.com/ChainSafe/log15" "github.com/gorilla/websocket" - log "github.com/sirupsen/logrus" ) -// Handler struct for holding telemetry related things -type Handler struct { - buf bytes.Buffer - wsConn []*websocket.Conn - sync.RWMutex +type telemetryConnection struct { + wsconn *websocket.Conn + verbosity int + sync.Mutex } -// MyJSONFormatter struct for defining JSON Formatter -type MyJSONFormatter struct { +// Message struct to hold telemetry message data +type Message struct { + values map[string]interface{} } -// Format function for handling JSON formatting, this overrides default logging formatter to remove -// log level, line number and timestamp -func (f *MyJSONFormatter) Format(entry *log.Entry) ([]byte, error) { - serialised, err := json.Marshal(entry.Data) - if err != nil { - return nil, fmt.Errorf("failed to marshal fields to JSON, %w", err) - } - return append(serialised, '\n'), nil +// Handler struct for holding telemetry related things +type Handler struct { + msg chan Message + connections []*telemetryConnection + log log.Logger +} + +// KeyValue object to hold key value pairs used in telemetry messages +type KeyValue struct { + key string + value interface{} } var ( @@ -57,126 +57,98 @@ var ( ) // GetInstance singleton pattern to for accessing TelemetryHandler -func GetInstance() *Handler { +func GetInstance() *Handler { //nolint if handlerInstance == nil { once.Do( func() { handlerInstance = &Handler{ - buf: bytes.Buffer{}, + msg: make(chan Message, 256), + log: log.New("pkg", "telemetry"), } - log.SetOutput(&handlerInstance.buf) - log.SetFormatter(new(MyJSONFormatter)) - go handlerInstance.sender() + go handlerInstance.startListening() }) } return handlerInstance } -// AddConnections adds connections to telemetry sever +// NewTelemetryMessage builds a telemetry message +func NewTelemetryMessage(values ...*KeyValue) *Message { //nolint + mvals := make(map[string]interface{}) + for _, v := range values { + mvals[v.key] = v.value + } + return &Message{ + values: mvals, + } +} + +// NewKeyValue builds a key value pair for telemetry messages +func NewKeyValue(key string, value interface{}) *KeyValue { //nolint + return &KeyValue{ + key: key, + value: value, + } +} + +// AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { for _, v := range conns { c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil) if err != nil { - fmt.Printf("Error %v\n", err) + // todo (ed) try reconnecting if there is an error connecting + h.log.Debug("issue adding telemetry connection", "error", err) continue } - h.wsConn = append(h.wsConn, c) + tConn := &telemetryConnection{ + wsconn: c, + verbosity: v.Verbosity, + } + h.connections = append(h.connections, tConn) } } -// ConnectionData struct to hold connection data -type ConnectionData struct { - Authority bool - Chain string - GenesisHash string - SystemName string - NodeName string - SystemVersion string - NetworkID string - StartTime string -} - -// SendConnection sends connection request message to telemetry connection -func (h *Handler) SendConnection(data *ConnectionData) { - h.Lock() - defer h.Unlock() - payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash, - "implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime, - "version": data.SystemVersion} - telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) - telemetryLogger.Print() -} - -// SendBlockImport sends block imported message to telemetry connection -func (h *Handler) SendBlockImport(bestHash string, height *big.Int) { - h.Lock() - defer h.Unlock() - payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"} - telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) - telemetryLogger.Print() -} +// SendMessage sends Message to connected telemetry listeners +func (h *Handler) SendMessage(msg *Message) error { + select { + case h.msg <- *msg: -// NetworkData struct to hold network data telemetry information -type NetworkData struct { - peers int - rateIn float64 - rateOut float64 -} - -// NewNetworkData creates networkData struct -func NewNetworkData(peers int, rateIn, rateOut float64) *NetworkData { - return &NetworkData{ - peers: peers, - rateIn: rateIn, - rateOut: rateOut, + case <-time.After(time.Second * 1): + return errors.New("timeout sending message") } + return nil } -// SendNetworkData send network data system.interval message to telemetry connection -func (h *Handler) SendNetworkData(data *NetworkData) { - h.Lock() - defer h.Unlock() - payload := log.Fields{"bandwidth_download": data.rateIn, "bandwidth_upload": data.rateOut, "msg": "system.interval", "peers": data.peers} - telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) - telemetryLogger.Print() -} - -// BlockIntervalData struct to hold data for block system.interval message -type BlockIntervalData struct { - BestHash common.Hash - BestHeight *big.Int - FinalizedHash common.Hash - FinalizedHeight *big.Int - TXCount int - UsedStateCacheSize int +func (h *Handler) startListening() { + for { + msg := <-h.msg + go func() { + for _, conn := range h.connections { + conn.Lock() + err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg)) + if err != nil { + h.log.Warn("issue while sending telemetry message", "error", err) + } + conn.Unlock() + } + }() + } } -// SendBlockIntervalData send block data system interval information to telemetry connection -func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) { - h.Lock() - defer h.Unlock() - payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(), // nolint - "finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount, // nolint - "used_state_cache_size": data.UsedStateCacheSize} - telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) - telemetryLogger.Print() +type response struct { + ID int `json:"id"` + Payload map[string]interface{} `json:"payload"` + Timestamp time.Time `json:"ts"` } -func (h *Handler) sender() { - for { - h.RLock() - line, err := h.buf.ReadBytes(byte(10)) // byte 10 is newline character, used as delimiter - h.RUnlock() - if err != nil { - continue - } - - for _, c := range h.wsConn { - err := c.WriteMessage(websocket.TextMessage, line) - if err != nil { - // TODO (ed) determine how to handle this error - fmt.Printf("ERROR connecting to telemetry %v\n", err) - } - } +func msgToBytes(message Message) []byte { + res := response{ + ID: 1, // todo (ed) determine how this is used + Payload: message.values, + Timestamp: time.Now(), + } + resB, err := json.Marshal(res) + if err != nil { + return nil } + return resB } diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index 3cdbed43e0..e2b7b9c05b 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -11,13 +11,13 @@ import ( "testing" "time" - "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" "github.com/gorilla/websocket" "github.com/stretchr/testify/require" ) var upgrader = websocket.Upgrader{} +var resultCh chan []byte func TestMain(m *testing.M) { // start server to listen for websocket connections @@ -39,8 +39,6 @@ func TestMain(m *testing.M) { os.Exit(code) } -var resultCh chan []byte - func TestHandler_SendMulti(t *testing.T) { var wg sync.WaitGroup wg.Add(4) @@ -48,45 +46,52 @@ func TestHandler_SendMulti(t *testing.T) { resultCh = make(chan []byte) go func() { - GetInstance().SendConnection(&ConnectionData{ - Authority: false, - Chain: "chain", - GenesisHash: "hash", - SystemName: "systemName", - NodeName: "nodeName", - SystemVersion: "version", - NetworkID: "netID", - StartTime: "startTime", - }) + GetInstance().SendMessage(NewTelemetryMessage( + NewKeyValue("authority", false), + NewKeyValue("chain", "chain"), + NewKeyValue("genesis_hash", "hash"), + NewKeyValue("implementation", "systemName"), + NewKeyValue("msg", "system.connected"), + NewKeyValue("name", "nodeName"), + NewKeyValue("network_id", "netID"), + NewKeyValue("startup_time", "startTime"), + NewKeyValue("version", "version"))) wg.Done() }() go func() { - GetInstance().SendBlockImport("hash", big.NewInt(2)) + GetInstance().SendMessage(NewTelemetryMessage( + NewKeyValue("best", "hash"), + NewKeyValue("height", big.NewInt(2)), + NewKeyValue("msg", "block.import"), + NewKeyValue("origin", "NetworkInitialSync"))) wg.Done() }() go func() { - GetInstance().SendNetworkData(NewNetworkData(1, 2, 3)) + GetInstance().SendMessage(NewTelemetryMessage( + NewKeyValue("bandwidth_download", 2), + NewKeyValue("bandwidth_upload", 3), + NewKeyValue("msg", "system.interval"), + NewKeyValue("peers", 1))) wg.Done() }() go func() { - GetInstance().SendBlockIntervalData(&BlockIntervalData{ - BestHash: common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), - BestHeight: big.NewInt(32375), - FinalizedHash: common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"), - FinalizedHeight: big.NewInt(32256), - TXCount: 2, - UsedStateCacheSize: 1886357, - }) + GetInstance().SendMessage(NewTelemetryMessage( + NewKeyValue("best", "0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), + NewKeyValue("finalized_hash", "0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"), // nolint + NewKeyValue("finalized_height", 32256), NewKeyValue("height", 32375), // nolint + NewKeyValue("msg", "system.interval"), NewKeyValue("txcount", 2), + NewKeyValue("used_state_cache_size", 1886357))) wg.Done() }() + wg.Wait() expected1 := []byte(`{"id":1,"payload":{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1},"ts":`) expected2 := []byte(`{"id":1,"payload":{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync"},"ts":`) - expected3 := []byte(`{"id":1,"payload":{"authority":false,"chain":"chain","config":"","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","version":"version"},"ts":`) + expected3 := []byte(`{"id":1,"payload":{"authority":false,"chain":"chain","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","version":"version"},"ts":`) expected4 := []byte(`{"id":1,"payload":{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","txcount":2,"used_state_cache_size":1886357},"ts":`) // nolint expected := [][]byte{expected3, expected1, expected4, expected2} @@ -108,6 +113,32 @@ func TestHandler_SendMulti(t *testing.T) { require.Contains(t, string(actual[3]), string(expected[3])) } +func TestListenerConcurrency(t *testing.T) { + const qty = 1000 + var wg sync.WaitGroup + wg.Add(qty) + + resultCh = make(chan []byte) + for i := 0; i < qty; i++ { + go func() { + GetInstance().SendMessage(NewTelemetryMessage( + NewKeyValue("best", "hash"), + NewKeyValue("height", big.NewInt(2)), + NewKeyValue("msg", "block.import"), + NewKeyValue("origin", "NetworkInitialSync"))) + wg.Done() + }() + } + wg.Wait() + counter := 0 + for range resultCh { + counter++ + if counter == qty { + break + } + } +} + func listen(w http.ResponseWriter, r *http.Request) { c, err := upgrader.Upgrade(w, r, nil) if err != nil {