Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/telemetry): implement telemetry message network_state #1618

Merged
merged 32 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fd1024c
refactor telemetry messages to map format
edwardmack May 31, 2021
d867d2f
add basic network state telemetry message
edwardmack Jun 1, 2021
17aba94
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 11, 2021
cd3abbb
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 11, 2021
74abc1b
refactor message sender to handle interface{} types
edwardmack Jun 11, 2021
6997fa1
refactor telemetry messages to be structs
edwardmack Jun 15, 2021
90506c0
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 15, 2021
9b4eb7a
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 16, 2021
7c00e1e
lint
edwardmack Jun 16, 2021
5ceddd9
go fmt
edwardmack Jun 16, 2021
266c3dd
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 18, 2021
529bac0
lint
edwardmack Jun 18, 2021
6eb7b63
move msg building logic outside msg sending loop
edwardmack Jun 18, 2021
4e1f92b
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 19, 2021
13024dd
make telemetry messages an interface
edwardmack Jun 19, 2021
7263912
Lookup transactions count from TransactionsState
edwardmack Jun 19, 2021
6f07942
address comments
edwardmack Jun 19, 2021
545c25c
fix mocks for tests
edwardmack Jun 21, 2021
17bd211
lint
edwardmack Jun 21, 2021
ac904b0
refactor TelemetryMessage to Message
edwardmack Jun 21, 2021
b0e43fa
update mock handler to return result
edwardmack Jun 22, 2021
3247f7e
add TransactionsCount to mockhandler
edwardmack Jun 22, 2021
ffc8428
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 23, 2021
cf87b93
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 23, 2021
9ab5339
move logic to build new network state message
edwardmack Jun 23, 2021
08fd38a
lint
edwardmack Jun 23, 2021
e5c4de9
fix interface
edwardmack Jun 23, 2021
02f03db
update mockhandler
edwardmack Jun 24, 2021
0428696
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 24, 2021
a160f58
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 24, 2021
a257def
Merge branch 'development' into ed/tel_msg_network_state
edwardmack Jun 28, 2021
0321eca
lint
edwardmack Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions chain/dev/genesis.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
"id": "dev",
"chainType": "Local",
"bootNodes": [],
"telemetryEndpoints": null,
"telemetryEndpoints": [
[
"wss://telemetry.polkadot.io/submit/",
0
]
],
"protocolId": "/gossamer/dev/0",
"genesis": {
"raw": {
Expand Down Expand Up @@ -32,4 +37,4 @@
"forkBlocks": null,
"badBlocks": null,
"consensusEngine": ""
}
}
8 changes: 7 additions & 1 deletion chain/gssmr/genesis.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
"id": "gssmr",
"chainType": "Local",
"bootNodes": [],
"telemetryEndpoints": [
[
"wss://telemetry.polkadot.io/submit/",
0
]
],
"protocolId": "/gossamer/gssmr/0",
"genesis": {
"raw": {
Expand Down Expand Up @@ -40,4 +46,4 @@
"forkBlocks": null,
"badBlocks": null,
"consensusEngine": ""
}
}
5 changes: 5 additions & 0 deletions dot/core/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,8 @@ func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (boo

return len(msg.Extrinsics) > 0, nil
}

// TransactionsCount returns number for pending transactions in pool
func (s *Service) TransactionsCount() int {
return len(s.transactionState.PendingInPool())
}
6 changes: 3 additions & 3 deletions dot/core/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"
"time"

. "github.com/ChainSafe/gossamer/dot/core/mocks"
. "github.com/ChainSafe/gossamer/dot/core/mocks" // nolint
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
Expand All @@ -38,7 +38,7 @@ import (

func TestService_ProcessBlockAnnounceMessage(t *testing.T) {
// TODO: move to sync package
net := new(MockNetwork)
net := new(MockNetwork) // nolint

cfg := &Config{
Network: net,
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestService_HandleTransactionMessage(t *testing.T) {
ks := keystore.NewGlobalKeystore()
ks.Acco.Insert(kp)

bp := new(MockBlockProducer)
bp := new(MockBlockProducer) // nolint
blockC := make(chan types.Block)
bp.On("GetBlockChannel", nil).Return(blockC)

Expand Down
14 changes: 14 additions & 0 deletions dot/network/mock_transaction_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 50 additions & 14 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package network
import (
"context"
"errors"
"fmt"
"io"
"math/big"
"os"
"sync"
"time"
Expand Down Expand Up @@ -304,6 +306,12 @@ func (s *Service) logPeerCount() {
}
}

type peerInfo struct {
Roles byte `json:"roles"`
BestHash string `json:"bestHash"`
BestNumber uint64 `json:"bestNumber"`
}

func (s *Service) publishNetworkTelemetry(done chan interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering why this uses done chan instead of the service ctx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following suggestions from this thread: #1528 (comment)

ticker := time.NewTicker(s.telemetryInterval)
defer ticker.Stop()
Expand All @@ -316,11 +324,36 @@ main:

case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("bandwidth_download", o.RateIn),
telemetry.NewKeyValue("bandwidth_upload", o.RateOut),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("peers", s.host.peerCount())))
err := telemetry.GetInstance().SendMessage(telemetry.NewBandwidthTM(o.RateIn, o.RateOut, s.host.peerCount()))

if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
netState := make(map[string]interface{})
netState["peerId"] = s.host.h.ID()
hostAddrs := []string{}
for _, v := range s.host.h.Addrs() {
hostAddrs = append(hostAddrs, v.String())
}
netState["externalAddressess"] = hostAddrs
listAddrs := []string{}
for _, v := range s.host.h.Network().ListenAddresses() {
listAddrs = append(listAddrs, fmt.Sprintf("%s/p2p/%s", v, s.host.h.ID()))
}
netState["listenedAddressess"] = listAddrs

peers := make(map[string]interface{})
for _, v := range s.Peers() {
p := &peerInfo{
Roles: v.Roles,
BestHash: v.BestHash.String(),
BestNumber: v.BestNumber,
}
peers[v.PeerID] = *p
}
netState["connectedPeers"] = peers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this code be moved into NewNetworkStateTM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would make more sense there, moved.


err = telemetry.GetInstance().SendMessage(telemetry.NewNetworkStateTM(netState))
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
Expand All @@ -334,19 +367,22 @@ func (s *Service) sentBlockIntervalTelemetry() {
if err != nil {
continue
}
bestHash := best.Hash()

finalized, err := s.blockState.GetFinalizedHeader(0, 0) //nolint
if err != nil {
continue
}

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("best", best.Hash().String()),
telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint
telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint
telemetry.NewKeyValue("height", best.Number),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count
telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size
finalizedHash := finalized.Hash()

err = telemetry.GetInstance().SendMessage(telemetry.NewBlockIntervalTM(
&bestHash,
best.Number,
&finalizedHash,
finalized.Number,
big.NewInt(int64(s.transactionHandler.TransactionsCount())),
big.NewInt(0), // todo (ed) determine where to get used_state_cache_size
))
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
Expand Down
1 change: 1 addition & 0 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
if cfg.TransactionHandler == nil {
mocktxhandler := &MockTransactionHandler{}
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil)
mocktxhandler.On("TransactionsCount").Return(0)
cfg.TransactionHandler = mocktxhandler
}

Expand Down
3 changes: 2 additions & 1 deletion dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@ type Syncer interface {

// TransactionHandler is the interface used by the transactions sub-protocol
type TransactionHandler interface {
HandleTransactionMessage(*TransactionMessage) (bool, error)
HandleTransactionMessage(*TransactionMessage) error
TransactionsCount() int
}
1 change: 1 addition & 0 deletions dot/network/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewMockSyncer() *MockSyncer {
func NewMockTransactionHandler() *MockTransactionHandler {
mocktxhandler := new(MockTransactionHandler)
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(nil)
mocktxhandler.On("TransactionsCount").Return(0)
return mocktxhandler
}

Expand Down
3 changes: 2 additions & 1 deletion dot/network/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestDecodeTransactionMessage(t *testing.T) {
func TestHandleTransactionMessage(t *testing.T) {
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(nil)
mockhandler.On("TransactionsCount").Return(0)

config := &Config{
BasePath: basePath,
Expand Down
21 changes: 10 additions & 11 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,16 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
}

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority),
telemetry.NewKeyValue("chain", sysSrvc.ChainName()),
telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()),
telemetry.NewKeyValue("implementation", sysSrvc.SystemName()),
telemetry.NewKeyValue("msg", "system.connected"),
telemetry.NewKeyValue("name", cfg.Global.Name),
telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID),
telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)),
telemetry.NewKeyValue("version", sysSrvc.SystemVersion())))
genesisHash := stateSrvc.Block.GenesisHash()
err = telemetry.GetInstance().SendMessage(telemetry.NewSystemConnectedTM(
cfg.Core.GrandpaAuthority,
sysSrvc.ChainName(),
&genesisHash,
sysSrvc.SystemName(),
cfg.Global.Name,
networkSrvc.NetworkState().PeerID,
strconv.FormatInt(time.Now().UnixNano(), 10),
sysSrvc.SystemVersion()))
if err != nil {
logger.Debug("problem sending system.connected telemetry message", "err", err)
}
Expand Down
12 changes: 6 additions & 6 deletions dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,13 @@ func (s *Service) handleBlock(block *types.Block) error {

logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( // nolint
telemetry.NewKeyValue("best", block.Header.Hash().String()),
telemetry.NewKeyValue("height", block.Header.Number.Uint64()),
telemetry.NewKeyValue("msg", "block.import"),
telemetry.NewKeyValue("origin", "NetworkInitialSync")))
blockHash := block.Header.Hash()
err = telemetry.GetInstance().SendMessage(telemetry.NewBlockImportTM(
&blockHash,
block.Header.Number,
"NetworkInitialSync"))
if err != nil {
logger.Trace("problem sending block.import telemetry message", "error", err)
logger.Debug("problem sending block.import telemetry message", "error", err)
}

return nil
Expand Down
Loading