Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/telemetry): implement telemetry message network_state #1618

Merged
merged 32 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fd1024c
refactor telemetry messages to map format
edwardmack May 31, 2021
d867d2f
add basic network state telemetry message
edwardmack Jun 1, 2021
17aba94
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 11, 2021
cd3abbb
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 11, 2021
74abc1b
refactor message sender to handle interface{} types
edwardmack Jun 11, 2021
6997fa1
refactor telemetry messages to be structs
edwardmack Jun 15, 2021
90506c0
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 15, 2021
9b4eb7a
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 16, 2021
7c00e1e
lint
edwardmack Jun 16, 2021
5ceddd9
go fmt
edwardmack Jun 16, 2021
266c3dd
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 18, 2021
529bac0
lint
edwardmack Jun 18, 2021
6eb7b63
move msg building logic outside msg sending loop
edwardmack Jun 18, 2021
4e1f92b
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 19, 2021
13024dd
make telemetry messages an interface
edwardmack Jun 19, 2021
7263912
Lookup transactions count from TransactionsState
edwardmack Jun 19, 2021
6f07942
address comments
edwardmack Jun 19, 2021
545c25c
fix mocks for tests
edwardmack Jun 21, 2021
17bd211
lint
edwardmack Jun 21, 2021
ac904b0
refactor TelemetryMessage to Message
edwardmack Jun 21, 2021
b0e43fa
update mock handler to return result
edwardmack Jun 22, 2021
3247f7e
add TransactionsCount to mockhandler
edwardmack Jun 22, 2021
ffc8428
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 23, 2021
cf87b93
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 23, 2021
9ab5339
move logic to build new network state message
edwardmack Jun 23, 2021
08fd38a
lint
edwardmack Jun 23, 2021
e5c4de9
fix interface
edwardmack Jun 23, 2021
02f03db
update mockhandler
edwardmack Jun 24, 2021
0428696
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 24, 2021
a160f58
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 24, 2021
a257def
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 28, 2021
0321eca
lint
edwardmack Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions chain/dev/genesis.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
"id": "dev",
"chainType": "Local",
"bootNodes": [],
"telemetryEndpoints": null,
"telemetryEndpoints": [
[
"wss://telemetry.polkadot.io/submit/",
0
]
],
"protocolId": "/gossamer/dev/0",
"genesis": {
"raw": {
Expand Down Expand Up @@ -32,4 +37,4 @@
"forkBlocks": null,
"badBlocks": null,
"consensusEngine": ""
}
}
8 changes: 7 additions & 1 deletion chain/gssmr/genesis.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
"id": "gssmr",
"chainType": "Local",
"bootNodes": [],
"telemetryEndpoints": [
[
"wss://telemetry.polkadot.io/submit/",
0
]
],
"protocolId": "/gossamer/gssmr/0",
"genesis": {
"raw": {
Expand Down Expand Up @@ -40,4 +46,4 @@
"forkBlocks": null,
"badBlocks": null,
"consensusEngine": ""
}
}
67 changes: 54 additions & 13 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"io"
"math/big"
"os"
"sync"
"time"
Expand Down Expand Up @@ -304,6 +305,12 @@ func (s *Service) logPeerCount() {
}
}

type peerInfo struct {
Roles byte `json:"roles"`
BestHash string `json:"bestHash"`
BestNumber uint64 `json:"bestNumber"`
}

func (s *Service) publishNetworkTelemetry(done chan interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering why this uses done chan instead of the service ctx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following suggestions from this thread: #1528 (comment)

ticker := time.NewTicker(s.telemetryInterval)
defer ticker.Stop()
Expand All @@ -316,11 +323,41 @@ main:

case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("bandwidth_download", o.RateIn),
telemetry.NewKeyValue("bandwidth_upload", o.RateOut),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("peers", s.host.peerCount())))
err := telemetry.GetInstance().SendMessage(telemetry.SystemIntervalTM{
BandwidthDownload: o.RateIn,
BandwidthUpload: o.RateOut,
Peers: s.host.peerCount(),
})
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
netState := make(map[string]interface{})
netState["peerId"] = s.host.h.ID()
hostAddrs := []string{}
for _, v := range s.host.h.Addrs() {
hostAddrs = append(hostAddrs, v.String())
}
netState["externalAddressess"] = hostAddrs
listAddrs := []string{}
for _, v := range s.host.h.Network().ListenAddresses() {
listAddrs = append(listAddrs, v.String())
}
netState["listenedAddressess"] = listAddrs

Copy link
Contributor

@noot noot Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expecting the multiaddress including peer ID? if so you will need to use something likenthis to create the listen address strings:

fmt.Sprintf("%s/p2p/%s", multiaddress, id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

peers := make(map[string]interface{})
for _, v := range s.Peers() {
p := &peerInfo{
Roles: v.Roles,
BestHash: v.BestHash.String(),
BestNumber: v.BestNumber,
}
peers[v.PeerID] = *p
}
netState["connectedPeers"] = peers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this code be moved into NewNetworkStateTM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would make more sense there, moved.


err = telemetry.GetInstance().SendMessage(telemetry.NetworkStateTM{
State: netState,
})
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
Expand All @@ -334,19 +371,23 @@ func (s *Service) sentBlockIntervalTelemetry() {
if err != nil {
continue
}
bestHash := best.Hash()

finalized, err := s.blockState.GetFinalizedHeader(0, 0) //nolint
if err != nil {
continue
}
finalizedHash := finalized.Hash()

err = telemetry.GetInstance().SendMessage(telemetry.SystemIntervalTM{
BestHash: &bestHash,
BestHeight: best.Number,
FinalisedHash: &finalizedHash,
FinalisedHeight: finalized.Number,
TxCount: big.NewInt(0), // todo (ed) determine where to get tx count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this regarding the tx pool? if so you can get the size via the TransactionState

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've added interface to get Transactions Count.

UsedStateCacheSize: big.NewInt(0), // todo (ed) determine where to get used_state_cache_size
})

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("best", best.Hash().String()),
telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint
telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint
telemetry.NewKeyValue("height", best.Number),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count
telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
Expand Down
22 changes: 11 additions & 11 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,17 +350,17 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
}

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority),
telemetry.NewKeyValue("chain", sysSrvc.ChainName()),
telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()),
telemetry.NewKeyValue("implementation", sysSrvc.SystemName()),
telemetry.NewKeyValue("msg", "system.connected"),
telemetry.NewKeyValue("name", cfg.Global.Name),
telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID),
telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)),
telemetry.NewKeyValue("version", sysSrvc.SystemVersion())))
genesisHash := stateSrvc.Block.GenesisHash()
err = telemetry.GetInstance().SendMessage(telemetry.SystemConnectedTM{
Authority: cfg.Core.GrandpaAuthority,
Chain: sysSrvc.ChainName(),
GenesisHash: &genesisHash,
Implementation: sysSrvc.SystemName(),
Name: cfg.Global.Name,
NetworkID: networkSrvc.NetworkState().PeerID,
StartupTime: strconv.FormatInt(time.Now().UnixNano(), 10),
Version: sysSrvc.SystemVersion(),
})
if err != nil {
logger.Debug("problem sending system.connected telemetry message", "err", err)
}
Expand Down
15 changes: 9 additions & 6 deletions dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,15 @@ func (s *Service) handleBlock(block *types.Block) error {
return err
}
} else {
logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( // nolint
telemetry.NewKeyValue("best", block.Header.Hash().String()),
telemetry.NewKeyValue("height", block.Header.Number.Uint64()),
telemetry.NewKeyValue("msg", "block.import"),
telemetry.NewKeyValue("origin", "NetworkInitialSync")))
blockHash := block.Header.Hash()
logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", blockHash)

err := telemetry.GetInstance().SendMessage(telemetry.BlockImportTM{ // nolint
BestHash: &blockHash,
Height: block.Header.Number,
Origin: "NetworkInitialSync",
})

if err != nil {
logger.Debug("problem sending block.import telemetry message", "error", err)
}
Expand Down
134 changes: 85 additions & 49 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package telemetry
import (
"encoding/json"
"errors"
"math/big"
"reflect"
"sync"
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/genesis"
log "github.com/ChainSafe/log15"
"github.com/gorilla/websocket"
Expand All @@ -33,24 +36,13 @@ type telemetryConnection struct {
sync.Mutex
}

// Message struct to hold telemetry message data
type Message struct {
values map[string]interface{}
}

// Handler struct for holding telemetry related things
type Handler struct {
msg chan Message
msg chan interface{}
connections []*telemetryConnection
log log.Logger
}

// KeyValue object to hold key value pairs used in telemetry messages
type KeyValue struct {
key string
value interface{}
}

var (
once sync.Once
handlerInstance *Handler
Expand All @@ -62,7 +54,7 @@ func GetInstance() *Handler { //nolint
once.Do(
func() {
handlerInstance = &Handler{
msg: make(chan Message, 256),
msg: make(chan interface{}, 256),
log: log.New("pkg", "telemetry"),
}
go handlerInstance.startListening()
Expand All @@ -71,25 +63,6 @@ func GetInstance() *Handler { //nolint
return handlerInstance
}

// NewTelemetryMessage builds a telemetry message
func NewTelemetryMessage(values ...*KeyValue) *Message { //nolint
mvals := make(map[string]interface{})
for _, v := range values {
mvals[v.key] = v.value
}
return &Message{
values: mvals,
}
}

// NewKeyValue builds a key value pair for telemetry messages
func NewKeyValue(key string, value interface{}) *KeyValue { //nolint
return &KeyValue{
key: key,
value: value,
}
}

// AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data
func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
for _, v := range conns {
Expand All @@ -108,9 +81,9 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
}

// SendMessage sends Message to connected telemetry listeners
func (h *Handler) SendMessage(msg *Message) error {
func (h *Handler) SendMessage(msg interface{}) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you create a TelemetryMessage interface that the message types all implement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, created TelemetryMessage interface.

select {
case h.msg <- *msg:
case h.msg <- msg:

case <-time.After(time.Second * 1):
return errors.New("timeout sending message")
Expand All @@ -124,31 +97,94 @@ func (h *Handler) startListening() {
go func() {
for _, conn := range h.connections {
conn.Lock()
err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg))
defer conn.Unlock()
msgBytes, err := h.msgToJSON(msg)
if err != nil || len(msgBytes) == 0 {
h.log.Debug("issue decoding telemetry message", "error", err)
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this out of the for loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, no need to keep rebuilding the message.


err = conn.wsconn.WriteMessage(websocket.TextMessage, msgBytes)
if err != nil {
h.log.Warn("issue while sending telemetry message", "error", err)
}
conn.Unlock()
}
}()
}
}

type response struct {
ID int `json:"id"`
Payload map[string]interface{} `json:"payload"`
Timestamp time.Time `json:"ts"`
}
func (h *Handler) msgToJSON(message interface{}) ([]byte, error) {
defer h.recoverMessage()

func msgToBytes(message Message) []byte {
res := response{
ID: 1, // todo (ed) determine how this is used
Payload: message.values,
Timestamp: time.Now(),
messageBytes, err := json.Marshal(message)
if err != nil {
return nil, err
}
resB, err := json.Marshal(res)

messageMap := make(map[string]interface{})
err = json.Unmarshal(messageBytes, &messageMap)
if err != nil {
return nil
return nil, err
}
return resB

messageMap["ts"] = time.Now()
typ := reflect.TypeOf(message)
field, found := typ.FieldByName("Msg")
if !found {
return []byte{}, errors.New("unknown telemetry message type")
}
def := field.Tag.Get("default")
messageMap["msg"] = def

fullRes, err := json.Marshal(messageMap)
if err != nil {
return nil, err
}
return fullRes, nil
}
func (h *Handler) recoverMessage() {
if r := recover(); r != nil {
h.log.Debug("recovered", "issue", r)
}
}

// SystemConnectedTM struct to hold system connected telemetry messages
type SystemConnectedTM struct {
Authority bool `json:"authority"`
Chain string `json:"chain"`
GenesisHash *common.Hash `json:"genesis_hash"`
Implementation string `json:"implementation"`
Msg string `default:"system.connected" json:"msg"`
Name string `json:"name"`
NetworkID string `json:"network_id"`
StartupTime string `json:"startup_time"`
Version string `json:"version"`
}

// BlockImportTM struct to hold block import telemetry messages
type BlockImportTM struct {
BestHash *common.Hash `json:"best"`
Height *big.Int `json:"height"`
Msg string `default:"block.import" json:"msg"`
Origin string `json:"origin"`
}

// SystemIntervalTM struct to hold system interval telemetry messages
type SystemIntervalTM struct {
BandwidthDownload float64 `json:"bandwidth_download,omitempty"`
BandwidthUpload float64 `json:"bandwidth_upload,omitempty"`
Msg string `default:"system.interval" json:"msg"`
Peers int `json:"peers,omitempty"`
BestHash *common.Hash `json:"best,omitempty"`
BestHeight *big.Int `json:"height,omitempty"`
FinalisedHash *common.Hash `json:"finalized_hash,omitempty"` // nolint
FinalisedHeight *big.Int `json:"finalized_height,omitempty"` // nolint
TxCount *big.Int `json:"txcount,omitempty"`
UsedStateCacheSize *big.Int `json:"used_state_cache_size,omitempty"`
}

// NetworkStateTM struct to hold network state telemetry messages
type NetworkStateTM struct {
Msg string `default:"system.network_state" json:"msg"`
State map[string]interface{} `json:"state"`
}
Loading