Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/network) add network_is_major_syncing metric #1697

Merged
merged 13 commits into from
Jul 23, 2021
Merged
1 change: 1 addition & 0 deletions chain/polkadot/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[global]
basepath = "~/.gossamer/polkadot"
log = "info"
metrics-port = 9876

[log]
core = ""
Expand Down
1 change: 1 addition & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func PolkadotConfig() *Config {
LogLvl: polkadot.DefaultLvl,
RetainBlocks: gssmr.DefaultRetainBlocks,
Pruning: pruner.Mode(gssmr.DefaultPruningMode),
MetricsPort: gssmr.DefaultMetricsPort,
},
Log: LogConfig{
CoreLvl: polkadot.DefaultLvl,
Expand Down
4 changes: 1 addition & 3 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,7 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err
return nil
}

go func() {
s.syncQueue.handleBlockAnnounceHandshake(bhs.BestBlockNumber, peer)
}()
go s.syncQueue.handleBlockAnnounceHandshake(bhs.BestBlockNumber, peer)

return nil
}
Expand Down
16 changes: 16 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
transactionsID = "/transactions/1"

maxMessageSize = 1024 * 63 // 63kb for now

gssmrIsMajorSyncMetric = "gossamer/network/is_major_syncing"
)

var (
Expand Down Expand Up @@ -692,6 +694,20 @@ func (s *Service) NodeRoles() byte {
return s.cfg.Roles
}

// CollectGauge will be used to collect coutable metrics from network service
func (s *Service) CollectGauge() map[string]int64 {
var isSynced int64
if !s.syncer.IsSynced() {
isSynced = 1
} else {
isSynced = 0
}

return map[string]int64{
gssmrIsMajorSyncMetric: isSynced,
}
}

// HighestBlock returns the highest known block number
func (s *Service) HighestBlock() int64 {
return s.syncQueue.goal
Expand Down
18 changes: 18 additions & 0 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,21 @@ func TestHandleConn(t *testing.T) {
require.True(t, ok)
require.Equal(t, 1, aScore)
}

func TestSerivceIsMajorSyncMetrics(t *testing.T) {
mocksyncer := new(MockSyncer)

node := &Service{
syncer: mocksyncer,
}

mocksyncer.On("IsSynced").Return(false).Once()
m := node.CollectGauge()

require.Equal(t, int64(1), m[gssmrIsMajorSyncMetric])

mocksyncer.On("IsSynced").Return(true).Once()
m = node.CollectGauge()

require.Equal(t, int64(0), m[gssmrIsMajorSyncMetric])
}
50 changes: 36 additions & 14 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ChainSafe/gossamer/dot/types"
Expand Down Expand Up @@ -194,6 +195,7 @@ func (q *syncQueue) syncAtHead() {

t := time.NewTicker(q.slotDuration * 2)
defer t.Stop()

for {
select {
// sleep for average block time TODO: make this configurable from slot duration
Expand All @@ -207,10 +209,13 @@ func (q *syncQueue) syncAtHead() {
continue
}

goal := atomic.LoadInt64(&q.goal)

// we aren't at the head yet, sleep
if curr.Number.Int64() < q.goal && curr.Number.Cmp(prev.Number) > 0 {
if curr.Number.Int64() < goal && curr.Number.Cmp(prev.Number) > 0 {
prev = curr
q.s.noGossip = true
q.s.syncer.SetSyncing(true)
continue
}

Expand Down Expand Up @@ -247,10 +252,12 @@ func (q *syncQueue) handleResponseQueue() {
}

q.responseLock.Lock()
goal := atomic.LoadInt64(&q.goal)

if len(q.responses) == 0 {
q.responseLock.Unlock()

if len(q.requestCh) == 0 && head.Int64() < q.goal {
if len(q.requestCh) == 0 && head.Int64() < goal {
q.pushRequest(uint64(head.Int64()+1), blockRequestBufferSize, "")
}
continue
Expand Down Expand Up @@ -328,6 +335,9 @@ func (q *syncQueue) prunePeers() {
}

func (q *syncQueue) benchmark() {
t := time.NewTimer(time.Second * 5)
defer t.Stop()

for {
if q.ctx.Err() != nil {
return
Expand All @@ -338,19 +348,27 @@ func (q *syncQueue) benchmark() {
continue
}

if before.Number.Int64() >= q.goal {
goal := atomic.LoadInt64(&q.goal)

if before.Number.Int64() >= goal {
finalised, err := q.s.blockState.GetFinalisedHeader(0, 0) //nolint
if err != nil {
continue
}

logger.Info("💤 node waiting", "peer count", len(q.s.host.peers()), "head", before.Number, "finalised", finalised.Number)
time.Sleep(time.Second * 5)

// reset the counter and then wait 5 seconds
t.Reset(time.Second * 5)
<-t.C

continue
}

q.benchmarker.begin(before.Number.Uint64())
time.Sleep(time.Second * 5)

t.Reset(time.Second * 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👍

<-t.C

after, err := q.s.blockState.BestBlockHeader()
if err != nil {
Expand All @@ -361,7 +379,7 @@ func (q *syncQueue) benchmark() {

logger.Info("🚣 currently syncing",
"peer count", len(q.s.host.peers()),
"goal", q.goal,
"goal", goal,
"average blocks/second", q.benchmarker.mostRecentAverage(),
"overall average", q.benchmarker.average(),
)
Expand Down Expand Up @@ -418,11 +436,13 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) {
return
}

if q.goal < best.Int64() {
q.goal = best.Int64()
goal := atomic.LoadInt64(&q.goal)
if goal < best.Int64() {
atomic.StoreInt64(&q.goal, best.Int64())
}

if q.goal-int64(start) < int64(blockRequestSize) {
goal = atomic.LoadInt64(&q.goal)
if goal-int64(start) < int64(blockRequestSize) {
start := best.Int64() + 1
req := createBlockRequest(start, 0)

Expand All @@ -443,7 +463,7 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) {
start = start - m + 1

for i := 0; i < numRequests; i++ {
if start > uint64(q.goal) {
if start > uint64(goal) {
return
}

Expand Down Expand Up @@ -833,11 +853,12 @@ func (q *syncQueue) handleBlockAnnounceHandshake(blockNum uint32, from peer.ID)
return
}

if bestNum.Int64() >= int64(blockNum) || q.goal >= int64(blockNum) {
goal := atomic.LoadInt64(&q.goal)
if bestNum.Int64() >= int64(blockNum) || goal >= int64(blockNum) {
return
}

q.goal = int64(blockNum)
atomic.StoreInt64(&q.goal, int64(blockNum))
q.pushRequest(uint64(bestNum.Int64()+1), blockRequestBufferSize, from)
}

Expand All @@ -856,8 +877,9 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID)
return
}

if header.Number.Int64() > q.goal {
q.goal = header.Number.Int64()
goal := atomic.LoadInt64(&q.goal)
if header.Number.Int64() > goal {
atomic.StoreInt64(&q.goal, header.Number.Int64())
}

req := createBlockRequestWithHash(header.Hash(), blockRequestSize)
Expand Down
1 change: 1 addition & 0 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
c := metrics.NewCollector(context.Background())
c.AddGauge(fg)
c.AddGauge(stateSrvc)
c.AddGauge(networkSrvc)

go c.Start()

Expand Down