From de1a60df3f2d13e8100b59d908a8f368ac661f86 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 27 Oct 2021 23:09:59 +0530 Subject: [PATCH] feat(dot/telemetry): implement notify.finalized telemetry interface (#1877) --- dot/state/block_finalisation.go | 19 +++- dot/telemetry/block_import.go | 43 +++++++++ dot/telemetry/network_state.go | 66 +++++++++++++ dot/telemetry/notify_finalized.go | 42 +++++++++ dot/telemetry/system_connected.go | 50 ++++++++++ dot/telemetry/system_interval.go | 62 ++++++++++++ dot/telemetry/telemetry.go | 150 ++---------------------------- dot/telemetry/telemetry_test.go | 21 ++++- 8 files changed, 308 insertions(+), 145 deletions(-) create mode 100644 dot/telemetry/block_import.go create mode 100644 dot/telemetry/network_state.go create mode 100644 dot/telemetry/notify_finalized.go create mode 100644 dot/telemetry/system_connected.go create mode 100644 dot/telemetry/system_interval.go diff --git a/dot/state/block_finalisation.go b/dot/state/block_finalisation.go index ece7ddbd6e..aaeb9ffcfc 100644 --- a/dot/state/block_finalisation.go +++ b/dot/state/block_finalisation.go @@ -21,6 +21,7 @@ import ( "fmt" "math/big" + "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" ) @@ -126,7 +127,7 @@ func (bs *BlockState) GetHighestFinalisedHeader() (*types.Header, error) { return header, nil } -// SetFinalisedHash sets the latest finalised block header +// SetFinalisedHash sets the latest finalised block hash func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) error { bs.Lock() defer bs.Unlock() @@ -178,6 +179,22 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er } } + header, err := bs.GetHeader(hash) + if err != nil { + return fmt.Errorf("failed to get finalised header, hash: %s, error: %s", hash, err) + } + + err = telemetry.GetInstance().SendMessage( + telemetry.NewNotifyFinalizedTM( + header.Hash(), + header.Number.String(), + ), + ) + if err != nil { + return fmt.Errorf("could not send 'notify.finalized' telemetry message, error: %s", err) + } + + // return bs.setHighestRoundAndSetID(round, setID) bs.lastFinalised = hash return nil } diff --git a/dot/telemetry/block_import.go b/dot/telemetry/block_import.go new file mode 100644 index 0000000000..3b1a7250b8 --- /dev/null +++ b/dot/telemetry/block_import.go @@ -0,0 +1,43 @@ +// Copyright 2021 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package telemetry + +import ( + "math/big" + + "github.com/ChainSafe/gossamer/lib/common" +) + +// blockImportTM struct to hold block import telemetry messages +type blockImportTM struct { + BestHash *common.Hash `json:"best"` + Height *big.Int `json:"height"` + Origin string `json:"origin"` +} + +// NewBlockImportTM function to create new Block Import Telemetry Message +func NewBlockImportTM(bestHash *common.Hash, height *big.Int, origin string) Message { + return &blockImportTM{ + BestHash: bestHash, + Height: height, + Origin: origin, + } +} + +func (blockImportTM) messageType() string { + return blockImportMsg +} diff --git a/dot/telemetry/network_state.go b/dot/telemetry/network_state.go new file mode 100644 index 0000000000..c7be70246f --- /dev/null +++ b/dot/telemetry/network_state.go @@ -0,0 +1,66 @@ +// Copyright 2021 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package telemetry + +import ( + "fmt" + + "github.com/ChainSafe/gossamer/lib/common" + libp2phost "github.com/libp2p/go-libp2p-core/host" +) + +// networkStateTM struct to hold network state telemetry messages +type networkStateTM struct { + State map[string]interface{} `json:"state"` +} + +// NewNetworkStateTM function to create new Network State Telemetry Message +func NewNetworkStateTM(host libp2phost.Host, peerInfos []common.PeerInfo) Message { + netState := make(map[string]interface{}) + netState["peerId"] = host.ID() + hostAddrs := make([]string, 0, len(host.Addrs())) + for _, v := range host.Addrs() { + hostAddrs = append(hostAddrs, v.String()) + } + netState["externalAddressess"] = hostAddrs + + netListAddrs := host.Network().ListenAddresses() + listAddrs := make([]string, 0, len(netListAddrs)) + for _, v := range netListAddrs { + listAddrs = append(listAddrs, fmt.Sprintf("%s/p2p/%s", v, host.ID())) + } + netState["listenedAddressess"] = listAddrs + + peers := make(map[string]interface{}) + for _, v := range peerInfos { + p := &peerInfo{ + Roles: v.Roles, + BestHash: v.BestHash.String(), + BestNumber: v.BestNumber, + } + peers[v.PeerID] = *p + } + netState["connectedPeers"] = peers + + return &networkStateTM{ + State: netState, + } +} + +func (networkStateTM) messageType() string { + return systemNetworkStateMsg +} diff --git a/dot/telemetry/notify_finalized.go b/dot/telemetry/notify_finalized.go new file mode 100644 index 0000000000..9f29c3df38 --- /dev/null +++ b/dot/telemetry/notify_finalized.go @@ -0,0 +1,42 @@ +// Copyright 2021 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package telemetry + +import ( + "github.com/ChainSafe/gossamer/lib/common" +) + +//nolint +// notifyFinalizedTM holds `notify.finalized` telemetry message, which is +// supposed to be send when a new block gets finalized. +type notifyFinalizedTM struct { + Best common.Hash `json:"best"` + // Height is same as block.Header.Number + Height string `json:"height"` +} + +// NewNotifyFinalizedTM gets a new NotifyFinalizedTM struct. +func NewNotifyFinalizedTM(best common.Hash, height string) Message { + return ¬ifyFinalizedTM{ + Best: best, + Height: height, + } +} + +func (notifyFinalizedTM) messageType() string { + return notifyFinalizedMsg +} diff --git a/dot/telemetry/system_connected.go b/dot/telemetry/system_connected.go new file mode 100644 index 0000000000..2bcff1a552 --- /dev/null +++ b/dot/telemetry/system_connected.go @@ -0,0 +1,50 @@ +// Copyright 2021 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package telemetry + +import "github.com/ChainSafe/gossamer/lib/common" + +// systemConnectedTM struct to hold system connected telemetry messages +type systemConnectedTM struct { + Authority bool `json:"authority"` + Chain string `json:"chain"` + GenesisHash *common.Hash `json:"genesis_hash"` + Implementation string `json:"implementation"` + Name string `json:"name"` + NetworkID string `json:"network_id"` + StartupTime string `json:"startup_time"` + Version string `json:"version"` +} + +// NewSystemConnectedTM function to create new System Connected Telemetry Message +func NewSystemConnectedTM(authority bool, chain string, genesisHash *common.Hash, + implementation, name, networkID, startupTime, version string) Message { + return &systemConnectedTM{ + Authority: authority, + Chain: chain, + GenesisHash: genesisHash, + Implementation: implementation, + Name: name, + NetworkID: networkID, + StartupTime: startupTime, + Version: version, + } +} + +func (systemConnectedTM) messageType() string { + return systemConnectedMsg +} diff --git a/dot/telemetry/system_interval.go b/dot/telemetry/system_interval.go new file mode 100644 index 0000000000..10149f8798 --- /dev/null +++ b/dot/telemetry/system_interval.go @@ -0,0 +1,62 @@ +// Copyright 2021 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package telemetry + +import ( + "math/big" + + "github.com/ChainSafe/gossamer/lib/common" +) + +// systemIntervalTM struct to hold system interval telemetry messages +type systemIntervalTM struct { + BandwidthDownload float64 `json:"bandwidth_download,omitempty"` + BandwidthUpload float64 `json:"bandwidth_upload,omitempty"` + Peers int `json:"peers,omitempty"` + BestHash *common.Hash `json:"best,omitempty"` + BestHeight *big.Int `json:"height,omitempty"` + FinalisedHash *common.Hash `json:"finalized_hash,omitempty"` // nolint + FinalisedHeight *big.Int `json:"finalized_height,omitempty"` // nolint + TxCount *big.Int `json:"txcount,omitempty"` + UsedStateCacheSize *big.Int `json:"used_state_cache_size,omitempty"` +} + +// NewBandwidthTM function to create new Bandwidth Telemetry Message +func NewBandwidthTM(bandwidthDownload, bandwidthUpload float64, peers int) Message { + return &systemIntervalTM{ + BandwidthDownload: bandwidthDownload, + BandwidthUpload: bandwidthUpload, + Peers: peers, + } +} + +// NewBlockIntervalTM function to create new Block Interval Telemetry Message +func NewBlockIntervalTM(beshHash *common.Hash, bestHeight *big.Int, finalisedHash *common.Hash, + finalisedHeight, txCount, usedStateCacheSize *big.Int) Message { + return &systemIntervalTM{ + BestHash: beshHash, + BestHeight: bestHeight, + FinalisedHash: finalisedHash, + FinalisedHeight: finalisedHeight, + TxCount: txCount, + UsedStateCacheSize: usedStateCacheSize, + } +} + +func (systemIntervalTM) messageType() string { + return systemIntervalMsg +} diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index af08860190..3788a4831d 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -19,16 +19,21 @@ package telemetry import ( "encoding/json" "errors" - "fmt" - "math/big" "sync" "time" - "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" log "github.com/ChainSafe/log15" "github.com/gorilla/websocket" - libp2phost "github.com/libp2p/go-libp2p-core/host" +) + +// telemetry message types +const ( + notifyFinalizedMsg = "notify.finalized" + blockImportMsg = "block.import" + systemNetworkStateMsg = "system.network_state" + systemConnectedMsg = "system.connected" + systemIntervalMsg = "system.interval" ) type telemetryConnection struct { @@ -181,149 +186,12 @@ type Message interface { messageType() string } -// SystemConnectedTM struct to hold system connected telemetry messages -type SystemConnectedTM struct { - Authority bool `json:"authority"` - Chain string `json:"chain"` - GenesisHash *common.Hash `json:"genesis_hash"` - Implementation string `json:"implementation"` - Msg string `json:"msg"` - Name string `json:"name"` - NetworkID string `json:"network_id"` - StartupTime string `json:"startup_time"` - Version string `json:"version"` -} - -// NewSystemConnectedTM function to create new System Connected Telemetry Message -func NewSystemConnectedTM(authority bool, chain string, genesisHash *common.Hash, - implementation, name, networkID, startupTime, version string) *SystemConnectedTM { - return &SystemConnectedTM{ - Authority: authority, - Chain: chain, - GenesisHash: genesisHash, - Implementation: implementation, - Msg: "system.connected", - Name: name, - NetworkID: networkID, - StartupTime: startupTime, - Version: version, - } -} -func (tm *SystemConnectedTM) messageType() string { - return tm.Msg -} - -// BlockImportTM struct to hold block import telemetry messages -type BlockImportTM struct { - BestHash *common.Hash `json:"best"` - Height *big.Int `json:"height"` - Msg string `json:"msg"` - Origin string `json:"origin"` -} - -// NewBlockImportTM function to create new Block Import Telemetry Message -func NewBlockImportTM(bestHash *common.Hash, height *big.Int, origin string) *BlockImportTM { - return &BlockImportTM{ - BestHash: bestHash, - Height: height, - Msg: "block.import", - Origin: origin, - } -} - -func (tm *BlockImportTM) messageType() string { - return tm.Msg -} - -// SystemIntervalTM struct to hold system interval telemetry messages -type SystemIntervalTM struct { - BandwidthDownload float64 `json:"bandwidth_download,omitempty"` - BandwidthUpload float64 `json:"bandwidth_upload,omitempty"` - Msg string `json:"msg"` - Peers int `json:"peers,omitempty"` - BestHash *common.Hash `json:"best,omitempty"` - BestHeight *big.Int `json:"height,omitempty"` - FinalisedHash *common.Hash `json:"finalized_hash,omitempty"` // nolint - FinalisedHeight *big.Int `json:"finalized_height,omitempty"` // nolint - TxCount *big.Int `json:"txcount,omitempty"` - UsedStateCacheSize *big.Int `json:"used_state_cache_size,omitempty"` -} - -// NewBandwidthTM function to create new Bandwidth Telemetry Message -func NewBandwidthTM(bandwidthDownload, bandwidthUpload float64, peers int) *SystemIntervalTM { - return &SystemIntervalTM{ - BandwidthDownload: bandwidthDownload, - BandwidthUpload: bandwidthUpload, - Msg: "system.interval", - Peers: peers, - } -} - -// NewBlockIntervalTM function to create new Block Interval Telemetry Message -func NewBlockIntervalTM(beshHash *common.Hash, bestHeight *big.Int, finalisedHash *common.Hash, - finalisedHeight, txCount, usedStateCacheSize *big.Int) *SystemIntervalTM { - return &SystemIntervalTM{ - Msg: "system.interval", - BestHash: beshHash, - BestHeight: bestHeight, - FinalisedHash: finalisedHash, - FinalisedHeight: finalisedHeight, - TxCount: txCount, - UsedStateCacheSize: usedStateCacheSize, - } -} - -func (tm *SystemIntervalTM) messageType() string { - return tm.Msg -} - type peerInfo struct { Roles byte `json:"roles"` BestHash string `json:"bestHash"` BestNumber uint64 `json:"bestNumber"` } -// NetworkStateTM struct to hold network state telemetry messages -type NetworkStateTM struct { - Msg string `json:"msg"` - State map[string]interface{} `json:"state"` -} - -// NewNetworkStateTM function to create new Network State Telemetry Message -func NewNetworkStateTM(host libp2phost.Host, peerInfos []common.PeerInfo) *NetworkStateTM { - netState := make(map[string]interface{}) - netState["peerId"] = host.ID() - hostAddrs := []string{} - for _, v := range host.Addrs() { - hostAddrs = append(hostAddrs, v.String()) - } - netState["externalAddressess"] = hostAddrs - listAddrs := []string{} - for _, v := range host.Network().ListenAddresses() { - listAddrs = append(listAddrs, fmt.Sprintf("%s/p2p/%s", v, host.ID())) - } - netState["listenedAddressess"] = listAddrs - - peers := make(map[string]interface{}) - for _, v := range peerInfos { - p := &peerInfo{ - Roles: v.Roles, - BestHash: v.BestHash.String(), - BestNumber: v.BestNumber, - } - peers[v.PeerID] = *p - } - netState["connectedPeers"] = peers - - return &NetworkStateTM{ - Msg: "system.network_state", - State: netState, - } -} -func (tm *NetworkStateTM) messageType() string { - return tm.Msg -} - // NoopHandler struct no op handling (ignoring) telemetry messages type NoopHandler struct { } diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index 61c3283477..898f7127c5 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -43,7 +43,7 @@ func TestMain(m *testing.M) { func TestHandler_SendMulti(t *testing.T) { var wg sync.WaitGroup - wg.Add(4) + wg.Add(6) resultCh = make(chan []byte) @@ -78,20 +78,35 @@ func TestHandler_SendMulti(t *testing.T) { wg.Done() }() + go func() { + bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") + GetInstance().SendMessage(NewNotifyFinalizedTM(bestHash, "32375")) + + wg.Done() + }() + + go func() { + bestHash := common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c") + GetInstance().SendMessage(NewPreparedBlockForProposingTM(bestHash, "1")) + + wg.Done() + }() + wg.Wait() expected1 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`) expected2 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`) expected3 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`) expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint + expected5 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`) expected6 := []byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`) - expected := [][]byte{expected1, expected3, expected4, expected2, expected6} + expected := [][]byte{expected1, expected3, expected4, expected5, expected2, expected6} var actual [][]byte for data := range resultCh { actual = append(actual, data) - if len(actual) == 4 { + if len(actual) == 6 { break } }