Skip to content

Commit

Permalink
feat (dot/telemetry): implement telemetry system.interval message (Ch…
Browse files Browse the repository at this point in the history
…ainSafe#1528)

* implement network data sytem interval telemetry message

* add lock to telemetry struct to fix concurrent websocket writes

* implement block data system.interval telemetry message

* address comments

* fix race condition

* fix lint

* update tests

* refactor tests

* use interface{} for channel, add recover

* rename channel doneNetworkTelemetry to closeCh

* fix check for closed channel

* fix error checking
  • Loading branch information
edwardmack authored May 5, 2021
1 parent 0d6f488 commit e749a8d
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 42 deletions.
9 changes: 9 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package network
import (
"errors"
"path"
"time"

log "github.com/ChainSafe/log15"
"github.com/libp2p/go-libp2p-core/crypto"
Expand Down Expand Up @@ -93,6 +94,9 @@ type Config struct {

// PublishMetrics enables collection of network metrics
PublishMetrics bool

// telemetryInterval how often to send telemetry metrics
telemetryInterval time.Duration
}

// build checks the configuration, sets up the private key for the network service,
Expand Down Expand Up @@ -134,6 +138,11 @@ func (c *Config) build() error {
c.logger.Warn("Bootstrap is enabled but no bootstrap nodes are defined")
}

// set telemetryInterval to default
if c.telemetryInterval.Microseconds() == 0 {
c.telemetryInterval = time.Second * 5
}

return nil
}

Expand Down
15 changes: 13 additions & 2 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p"
libp2phost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
Expand Down Expand Up @@ -59,6 +60,7 @@ type host struct {
cm *ConnManager
ds *badger.Datastore
messageCache *messageCache
bwc *metrics.BandwidthCounter
}

// newHost creates a host wrapper with a new libp2p host instance
Expand Down Expand Up @@ -167,6 +169,8 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}

bwc := metrics.NewBandwidthCounter()

host := &host{
ctx: ctx,
h: h,
Expand All @@ -177,6 +181,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
ds: ds,
persistentPeers: pps,
messageCache: msgCache,
bwc: bwc,
}

cm.host = host
Expand Down Expand Up @@ -305,8 +310,14 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error {
lenBytes := uint64ToLEB128(msgLen)
encMsg = append(lenBytes, encMsg...)

_, err = s.Write(encMsg)
return err
sent, err := s.Write(encMsg)
if err != nil {
return err
}

h.bwc.LogSentMessage(int64(sent))

return nil
}

// getOutboundStream returns the outbound message stream for the given peer or returns
Expand Down
69 changes: 67 additions & 2 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"time"

gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/services"
"github.com/ethereum/go-ethereum/metrics"

log "github.com/ChainSafe/log15"
"github.com/ethereum/go-ethereum/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
Expand Down Expand Up @@ -89,6 +89,10 @@ type Service struct {
noDiscover bool
noMDNS bool
noGossip bool // internal option

// telemetry
telemetryInterval time.Duration
closeCh chan interface{}
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -142,6 +146,8 @@ func NewService(cfg *Config) (*Service, error) {
syncer: cfg.Syncer,
notificationsProtocols: make(map[byte]*notificationsProtocol),
lightRequest: make(map[peer.ID]struct{}),
telemetryInterval: cfg.telemetryInterval,
closeCh: make(chan interface{}),
}

network.syncQueue = newSyncQueue(network)
Expand Down Expand Up @@ -245,6 +251,9 @@ func (s *Service) Start() error {
}

go s.logPeerCount()
go s.publishNetworkTelemetry(s.closeCh)
go s.sentBlockIntervalTelemetry()

return nil
}

Expand Down Expand Up @@ -278,6 +287,47 @@ func (s *Service) logPeerCount() {
}
}

func (s *Service) publishNetworkTelemetry(done chan interface{}) {
ticker := time.NewTicker(s.telemetryInterval)
defer ticker.Stop()

main:
for {
select {
case <-done:
break main

case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
telemetry.GetInstance().SendNetworkData(telemetry.NewNetworkData(s.host.peerCount(), o.RateIn, o.RateOut))
}

}
}

func (s *Service) sentBlockIntervalTelemetry() {
for {
best, err := s.blockState.BestBlockHeader()
if err != nil {
continue
}
finalized, err := s.blockState.GetFinalizedHeader(0, 0) //nolint
if err != nil {
continue
}

telemetry.GetInstance().SendBlockIntervalData(&telemetry.BlockIntervalData{
BestHash: best.Hash(),
BestHeight: best.Number,
FinalizedHash: finalized.Hash(),
FinalizedHeight: finalized.Number,
TXCount: 0, // todo (ed) determine where to get tx count
UsedStateCacheSize: 0, // todo (ed) determine where to get used_state_cache_size
})
time.Sleep(s.telemetryInterval)
}
}

func (s *Service) handleConn(conn libp2pnetwork.Conn) {
// give new peers a slight weight
// TODO: do this once handshake is received
Expand Down Expand Up @@ -343,6 +393,19 @@ func (s *Service) Stop() error {
logger.Error("Failed to close host", "error", err)
}

// check if closeCh is closed, if not, close it.
mainloop:
for {
select {
case _, hasMore := <-s.closeCh:
if !hasMore {
break mainloop
}
default:
close(s.closeCh)
}
}

return nil
}

Expand Down Expand Up @@ -524,6 +587,8 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder
_ = stream.Close()
return
}

s.host.bwc.LogRecvMessage(int64(tot))
}
}

Expand Down
90 changes: 76 additions & 14 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ import (
"sync"
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)

// Handler struct for holding telemetry related things
type Handler struct {
buf bytes.Buffer
wsConn []*websocket.Conn
telemetryLogger *log.Entry
buf bytes.Buffer
wsConn []*websocket.Conn
sync.RWMutex
}

// MyJSONFormatter struct for defining JSON Formatter
Expand Down Expand Up @@ -65,6 +66,7 @@ func GetInstance() *Handler {
}
log.SetOutput(&handlerInstance.buf)
log.SetFormatter(new(MyJSONFormatter))
go handlerInstance.sender()
})
}
return handlerInstance
Expand All @@ -76,7 +78,7 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil)
if err != nil {
fmt.Printf("Error %v\n", err)
return
continue
}
h.wsConn = append(h.wsConn, c)
}
Expand All @@ -96,25 +98,85 @@ type ConnectionData struct {

// SendConnection sends connection request message to telemetry connection
func (h *Handler) SendConnection(data *ConnectionData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash,
"implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime,
"version": data.SystemVersion}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

// SendBlockImport sends block imported message to telemetry connection
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

func (h *Handler) sendTelemtry() {
for _, c := range h.wsConn {
_ = c.WriteMessage(websocket.TextMessage, h.buf.Bytes())
// NetworkData struct to hold network data telemetry information
type NetworkData struct {
peers int
rateIn float64
rateOut float64
}

// NewNetworkData creates networkData struct
func NewNetworkData(peers int, rateIn, rateOut float64) *NetworkData {
return &NetworkData{
peers: peers,
rateIn: rateIn,
rateOut: rateOut,
}
}

// SendNetworkData send network data system.interval message to telemetry connection
func (h *Handler) SendNetworkData(data *NetworkData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"bandwidth_download": data.rateIn, "bandwidth_upload": data.rateOut, "msg": "system.interval", "peers": data.peers}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

// BlockIntervalData struct to hold data for block system.interval message
type BlockIntervalData struct {
BestHash common.Hash
BestHeight *big.Int
FinalizedHash common.Hash
FinalizedHeight *big.Int
TXCount int
UsedStateCacheSize int
}

// SendBlockIntervalData send block data system interval information to telemetry connection
func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(), // nolint
"finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount, // nolint
"used_state_cache_size": data.UsedStateCacheSize}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

func (h *Handler) sender() {
for {
h.RLock()
line, err := h.buf.ReadBytes(byte(10)) // byte 10 is newline character, used as delimiter
h.RUnlock()
if err != nil {
continue
}

for _, c := range h.wsConn {
err := c.WriteMessage(websocket.TextMessage, line)
if err != nil {
// TODO (ed) determine how to handle this error
fmt.Printf("ERROR connecting to telemetry %v\n", err)
}
}
}
h.buf.Reset()
}
Loading

0 comments on commit e749a8d

Please sign in to comment.