Skip to content

Commit

Permalink
Merge branch 'development' into noot/neighbour-message
Browse files Browse the repository at this point in the history
  • Loading branch information
noot committed May 5, 2021
2 parents de1e6c2 + e749a8d commit 0d309a8
Show file tree
Hide file tree
Showing 13 changed files with 294 additions and 108 deletions.
8 changes: 4 additions & 4 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,14 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err
// handleBlockAnnounceMessage handles BlockAnnounce messages
// if some more blocks are required to sync the announced block, the node will open a sync stream
// with its peer and send a BlockRequest message
func (s *Service) handleBlockAnnounceMessage(peer peer.ID, msg NotificationsMessage) error {
func (s *Service) handleBlockAnnounceMessage(peer peer.ID, msg NotificationsMessage) (propagate bool, err error) {
if an, ok := msg.(*BlockAnnounceMessage); ok {
s.syncQueue.handleBlockAnnounce(an, peer)
err := s.syncer.HandleBlockAnnounce(an)
err = s.syncer.HandleBlockAnnounce(an)
if err != nil {
return err
return false, err
}
}

return nil
return true, nil
}
3 changes: 2 additions & 1 deletion dot/network/block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ func TestHandleBlockAnnounceMessage(t *testing.T) {
Number: big.NewInt(10),
}

err := s.handleBlockAnnounceMessage(peerID, msg)
propagate, err := s.handleBlockAnnounceMessage(peerID, msg)
require.NoError(t, err)
require.True(t, propagate)
}

func TestValidateBlockAnnounceHandshake(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package network
import (
"errors"
"path"
"time"

log "github.com/ChainSafe/log15"
"github.com/libp2p/go-libp2p-core/crypto"
Expand Down Expand Up @@ -93,6 +94,9 @@ type Config struct {

// PublishMetrics enables collection of network metrics
PublishMetrics bool

// telemetryInterval how often to send telemetry metrics
telemetryInterval time.Duration
}

// build checks the configuration, sets up the private key for the network service,
Expand Down Expand Up @@ -134,6 +138,11 @@ func (c *Config) build() error {
c.logger.Warn("Bootstrap is enabled but no bootstrap nodes are defined")
}

// set telemetryInterval to default
if c.telemetryInterval.Microseconds() == 0 {
c.telemetryInterval = time.Second * 5
}

return nil
}

Expand Down
15 changes: 13 additions & 2 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p"
libp2phost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
Expand Down Expand Up @@ -59,6 +60,7 @@ type host struct {
cm *ConnManager
ds *badger.Datastore
messageCache *messageCache
bwc *metrics.BandwidthCounter
}

// newHost creates a host wrapper with a new libp2p host instance
Expand Down Expand Up @@ -167,6 +169,8 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}

bwc := metrics.NewBandwidthCounter()

host := &host{
ctx: ctx,
h: h,
Expand All @@ -177,6 +181,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
ds: ds,
persistentPeers: pps,
messageCache: msgCache,
bwc: bwc,
}

cm.host = host
Expand Down Expand Up @@ -305,8 +310,14 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error {
lenBytes := uint64ToLEB128(msgLen)
encMsg = append(lenBytes, encMsg...)

_, err = s.Write(encMsg)
return err
sent, err := s.Write(encMsg)
if err != nil {
return err
}

h.bwc.LogSentMessage(int64(sent))

return nil
}

// getOutboundStream returns the outbound message stream for the given peer or returns
Expand Down
11 changes: 3 additions & 8 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type (
MessageDecoder = func([]byte) (NotificationsMessage, error)

// NotificationsMessageHandler is called when a (non-handshake) message is received over a notifications stream.
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) error
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)
)

type notificationsProtocol struct {
Expand Down Expand Up @@ -180,17 +180,12 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
"peer", stream.Conn().RemotePeer(),
)

err := messageHandler(peer, msg)
propagate, err := messageHandler(peer, msg)
if err != nil {
return err
}

if s.noGossip {
return nil
}

// TODO: we don't want to rebroadcast neighbour messages, so ignore all consensus messages for now
if _, isConsensus := msg.(*ConsensusMessage); isConsensus {
if !propagate || s.noGossip {
return nil
}

Expand Down
69 changes: 67 additions & 2 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"time"

gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/services"
"github.com/ethereum/go-ethereum/metrics"

log "github.com/ChainSafe/log15"
"github.com/ethereum/go-ethereum/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
Expand Down Expand Up @@ -89,6 +89,10 @@ type Service struct {
noDiscover bool
noMDNS bool
noGossip bool // internal option

// telemetry
telemetryInterval time.Duration
closeCh chan interface{}
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -142,6 +146,8 @@ func NewService(cfg *Config) (*Service, error) {
syncer: cfg.Syncer,
notificationsProtocols: make(map[byte]*notificationsProtocol),
lightRequest: make(map[peer.ID]struct{}),
telemetryInterval: cfg.telemetryInterval,
closeCh: make(chan interface{}),
}

network.syncQueue = newSyncQueue(network)
Expand Down Expand Up @@ -245,6 +251,9 @@ func (s *Service) Start() error {
}

go s.logPeerCount()
go s.publishNetworkTelemetry(s.closeCh)
go s.sentBlockIntervalTelemetry()

return nil
}

Expand Down Expand Up @@ -278,6 +287,47 @@ func (s *Service) logPeerCount() {
}
}

func (s *Service) publishNetworkTelemetry(done chan interface{}) {
ticker := time.NewTicker(s.telemetryInterval)
defer ticker.Stop()

main:
for {
select {
case <-done:
break main

case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
telemetry.GetInstance().SendNetworkData(telemetry.NewNetworkData(s.host.peerCount(), o.RateIn, o.RateOut))
}

}
}

func (s *Service) sentBlockIntervalTelemetry() {
for {
best, err := s.blockState.BestBlockHeader()
if err != nil {
continue
}
finalized, err := s.blockState.GetFinalizedHeader(0, 0) //nolint
if err != nil {
continue
}

telemetry.GetInstance().SendBlockIntervalData(&telemetry.BlockIntervalData{
BestHash: best.Hash(),
BestHeight: best.Number,
FinalizedHash: finalized.Hash(),
FinalizedHeight: finalized.Number,
TXCount: 0, // todo (ed) determine where to get tx count
UsedStateCacheSize: 0, // todo (ed) determine where to get used_state_cache_size
})
time.Sleep(s.telemetryInterval)
}
}

func (s *Service) handleConn(conn libp2pnetwork.Conn) {
// give new peers a slight weight
// TODO: do this once handshake is received
Expand Down Expand Up @@ -343,6 +393,19 @@ func (s *Service) Stop() error {
logger.Error("Failed to close host", "error", err)
}

// check if closeCh is closed, if not, close it.
mainloop:
for {
select {
case _, hasMore := <-s.closeCh:
if !hasMore {
break mainloop
}
default:
close(s.closeCh)
}
}

return nil
}

Expand Down Expand Up @@ -524,6 +587,8 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder
_ = stream.Close()
return
}

s.host.bwc.LogRecvMessage(int64(tot))
}
}

Expand Down
6 changes: 3 additions & 3 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ func decodeTransactionMessage(in []byte) (NotificationsMessage, error) {
return msg, err
}

func (s *Service) handleTransactionMessage(_ peer.ID, msg NotificationsMessage) error {
func (s *Service) handleTransactionMessage(_ peer.ID, msg NotificationsMessage) (bool, error) {
txMsg, ok := msg.(*TransactionMessage)
if !ok {
return errors.New("invalid transaction type")
return false, errors.New("invalid transaction type")
}

return s.transactionHandler.HandleTransactionMessage(txMsg)
return true, s.transactionHandler.HandleTransactionMessage(txMsg)
}
Loading

0 comments on commit 0d309a8

Please sign in to comment.