Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (dot/telemetry): implement telemetry system.interval message #1528

Merged
merged 22 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d68f7f2
implement network data sytem interval telemetry message
edwardmack Apr 15, 2021
cfa4302
add lock to telemetry struct to fix concurrent websocket writes
edwardmack Apr 15, 2021
67448d7
implement block data system.interval telemetry message
edwardmack Apr 15, 2021
ac28587
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 23, 2021
1c68d46
address comments
edwardmack Apr 23, 2021
b35bb3a
fix race condition
edwardmack Apr 26, 2021
cc07fdf
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 26, 2021
922ac7c
fix lint
edwardmack Apr 26, 2021
3ab3bc7
update tests
edwardmack Apr 27, 2021
4b1ee15
refactor tests
edwardmack Apr 27, 2021
a9ddf04
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 27, 2021
1cbed8d
use interface{} for channel, add recover
edwardmack Apr 28, 2021
6eab866
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 28, 2021
1292bfa
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 29, 2021
25d35e4
rename channel doneNetworkTelemetry to closeCh
edwardmack Apr 29, 2021
3caeb27
fix check for closed channel
edwardmack Apr 29, 2021
517a46c
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 29, 2021
9f60995
fix error checking
edwardmack May 3, 2021
54b3233
Merge branch 'development' into ed/tel_system_interval
edwardmack May 3, 2021
8735fed
Merge branch 'development' into ed/tel_system_interval
edwardmack May 4, 2021
66e85c7
Merge branch 'development' into ed/tel_system_interval
edwardmack May 4, 2021
121c7fb
Merge branch 'development' into ed/tel_system_interval
edwardmack May 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package network
import (
"errors"
"path"
"time"

log "github.com/ChainSafe/log15"
"github.com/libp2p/go-libp2p-core/crypto"
Expand Down Expand Up @@ -93,6 +94,9 @@ type Config struct {

// PublishMetrics enables collection of network metrics
PublishMetrics bool

// telemetryInterval how often to send telemetry metrics
telemetryInterval time.Duration
}

// build checks the configuration, sets up the private key for the network service,
Expand Down Expand Up @@ -134,6 +138,11 @@ func (c *Config) build() error {
c.logger.Warn("Bootstrap is enabled but no bootstrap nodes are defined")
}

// set telemetryInterval to default
if c.telemetryInterval.Microseconds() == 0 {
c.telemetryInterval = time.Second * 5
}

return nil
}

Expand Down
15 changes: 13 additions & 2 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p"
libp2phost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
Expand Down Expand Up @@ -59,6 +60,7 @@ type host struct {
cm *ConnManager
ds *badger.Datastore
messageCache *messageCache
bwc *metrics.BandwidthCounter
}

// newHost creates a host wrapper with a new libp2p host instance
Expand Down Expand Up @@ -167,6 +169,8 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}

bwc := metrics.NewBandwidthCounter()

host := &host{
ctx: ctx,
h: h,
Expand All @@ -177,6 +181,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
ds: ds,
persistentPeers: pps,
messageCache: msgCache,
bwc: bwc,
}

cm.host = host
Expand Down Expand Up @@ -311,8 +316,14 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error {
lenBytes := uint64ToLEB128(msgLen)
encMsg = append(lenBytes, encMsg...)

_, err = s.Write(encMsg)
return err
sent, err := s.Write(encMsg)
if err != nil {
return err
}

h.bwc.LogSentMessage(int64(sent))

return nil
}

// getOutboundStream returns the outbound message stream for the given peer or returns
Expand Down
69 changes: 67 additions & 2 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"time"

gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/services"
"github.com/ethereum/go-ethereum/metrics"

log "github.com/ChainSafe/log15"
"github.com/ethereum/go-ethereum/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
Expand Down Expand Up @@ -89,6 +89,10 @@ type Service struct {
noDiscover bool
noMDNS bool
noGossip bool // internal option

// telemetry
telemetryInterval time.Duration
closeCh chan interface{}
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -142,6 +146,8 @@ func NewService(cfg *Config) (*Service, error) {
syncer: cfg.Syncer,
notificationsProtocols: make(map[byte]*notificationsProtocol),
lightRequest: make(map[peer.ID]struct{}),
telemetryInterval: cfg.telemetryInterval,
closeCh: make(chan interface{}),
}

network.syncQueue = newSyncQueue(network)
Expand Down Expand Up @@ -246,6 +252,9 @@ func (s *Service) Start() error {
}

go s.logPeerCount()
go s.publishNetworkTelemetry(s.closeCh)
go s.sentBlockIntervalTelemetry()

return nil
}

Expand Down Expand Up @@ -279,6 +288,47 @@ func (s *Service) logPeerCount() {
}
}

func (s *Service) publishNetworkTelemetry(done chan interface{}) {
ticker := time.NewTicker(s.telemetryInterval)
defer ticker.Stop()

main:
for {
select {
case <-done:
break main

case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
telemetry.GetInstance().SendNetworkData(telemetry.NewNetworkData(s.host.peerCount(), o.RateIn, o.RateOut))
}

}
}

func (s *Service) sentBlockIntervalTelemetry() {
for {
best, err := s.blockState.BestBlockHeader()
if err != nil {
continue
}
finalized, err := s.blockState.GetFinalizedHeader(0, 0) //nolint
if err != nil {
continue
}

telemetry.GetInstance().SendBlockIntervalData(&telemetry.BlockIntervalData{
BestHash: best.Hash(),
BestHeight: best.Number,
FinalizedHash: finalized.Hash(),
FinalizedHeight: finalized.Number,
TXCount: 0, // todo (ed) determine where to get tx count
UsedStateCacheSize: 0, // todo (ed) determine where to get used_state_cache_size
})
time.Sleep(s.telemetryInterval)
}
}

func (s *Service) handleConn(conn libp2pnetwork.Conn) {
// give new peers a slight weight
// TODO: do this once handshake is received
Expand Down Expand Up @@ -344,6 +394,19 @@ func (s *Service) Stop() error {
logger.Error("Failed to close host", "error", err)
}

// check if closeCh is closed, if not, close it.
mainloop:
for {
select {
case _, hasMore := <-s.closeCh:
if !hasMore {
break mainloop
}
default:
close(s.closeCh)
}
}

return nil
}

Expand Down Expand Up @@ -521,6 +584,8 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder
_ = stream.Close()
return
}

s.host.bwc.LogRecvMessage(int64(tot))
}
}

Expand Down
89 changes: 75 additions & 14 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ import (
"sync"
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)

// Handler struct for holding telemetry related things
type Handler struct {
buf bytes.Buffer
wsConn []*websocket.Conn
telemetryLogger *log.Entry
buf bytes.Buffer
wsConn []*websocket.Conn
sync.RWMutex
}

// MyJSONFormatter struct for defining JSON Formatter
Expand Down Expand Up @@ -65,6 +66,7 @@ func GetInstance() *Handler {
}
log.SetOutput(&handlerInstance.buf)
log.SetFormatter(new(MyJSONFormatter))
go handlerInstance.sender()
})
}
return handlerInstance
Expand All @@ -76,7 +78,6 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should continue adding connections on error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed return.

fmt.Printf("Error %v\n", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue
Don't append a connection that returns an error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, updated.

return
}
h.wsConn = append(h.wsConn, c)
}
Expand All @@ -96,25 +97,85 @@ type ConnectionData struct {

// SendConnection sends connection request message to telemetry connection
func (h *Handler) SendConnection(data *ConnectionData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash,
"implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime,
"version": data.SystemVersion}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

// SendBlockImport sends block imported message to telemetry connection
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

func (h *Handler) sendTelemtry() {
for _, c := range h.wsConn {
_ = c.WriteMessage(websocket.TextMessage, h.buf.Bytes())
// NetworkData struct to hold network data telemetry information
type NetworkData struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Struct and all its fields are exported. Is it required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I've updated this.

peers int
rateIn float64
rateOut float64
}

// NewNetworkData creates networkData struct
func NewNetworkData(peers int, rateIn, rateOut float64) *NetworkData {
return &NetworkData{
peers: peers,
rateIn: rateIn,
rateOut: rateOut,
}
}

// SendNetworkData send network data system.interval message to telemetry connection
func (h *Handler) SendNetworkData(data *NetworkData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"bandwidth_download": data.rateIn, "bandwidth_upload": data.rateOut, "msg": "system.interval", "peers": data.peers}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

// BlockIntervalData struct to hold data for block system.interval message
type BlockIntervalData struct {
BestHash common.Hash
BestHeight *big.Int
FinalizedHash common.Hash
FinalizedHeight *big.Int
TXCount int
UsedStateCacheSize int
}

// SendBlockIntervalData send block data system interval information to telemetry connection
func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(), // nolint
"finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount, // nolint
"used_state_cache_size": data.UsedStateCacheSize}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

func (h *Handler) sender() {
for {
h.RLock()
line, err := h.buf.ReadBytes(byte(10)) // byte 10 is newline character, used as delimiter
h.RUnlock()
if err != nil {
continue
}

for _, c := range h.wsConn {
err := c.WriteMessage(websocket.TextMessage, line)
if err != nil {
// TODO (ed) determine how to handle this error
fmt.Printf("ERROR connecting to telemetry %v\n", err)
}
}
}
h.buf.Reset()
}
Loading