From 3319d1fecd0903631634b37dd17ee7f6456da5b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 15 Jul 2021 10:06:20 -0400 Subject: [PATCH 1/9] chore: transform goal into atomic --- chain/polkadot/config.toml | 3 ++- dot/network/block_announce.go | 4 +-- dot/network/service.go | 15 +++++++++++ dot/network/sync.go | 51 +++++++++++++++++++++++++---------- dot/node.go | 1 + 5 files changed, 56 insertions(+), 18 deletions(-) diff --git a/chain/polkadot/config.toml b/chain/polkadot/config.toml index a7166b5a9e..bd8f03239d 100644 --- a/chain/polkadot/config.toml +++ b/chain/polkadot/config.toml @@ -1,10 +1,11 @@ [global] basepath = "~/.gossamer/polkadot" log = "info" +metrics-port = 9876 [log] core = "" -network = "" +network = "debug" rpc = "" state = "" runtime = "" diff --git a/dot/network/block_announce.go b/dot/network/block_announce.go index bf8e655195..34371ffdf4 100644 --- a/dot/network/block_announce.go +++ b/dot/network/block_announce.go @@ -241,9 +241,7 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err return nil } - go func() { - s.syncQueue.handleBlockAnnounceHandshake(bhs.BestBlockNumber, peer) - }() + go s.syncQueue.handleBlockAnnounceHandshake(bhs.BestBlockNumber, peer) return nil } diff --git a/dot/network/service.go b/dot/network/service.go index 4a9bf3626d..9ed03598b9 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -47,6 +47,8 @@ const ( transactionsID = "/transactions/1" maxMessageSize = 1024 * 63 // 63kb for now + + gssmrIsMajorSyncMetric = "gossamer/network/is_major_syncing" ) var ( @@ -674,3 +676,16 @@ func (s *Service) Peers() []common.PeerInfo { func (s *Service) NodeRoles() byte { return s.cfg.Roles } + +func (s *Service) CollectGauge() map[string]int64 { + var issyncing int64 + if !s.syncer.IsSynced() { + issyncing = 1 + } else { + issyncing = 0 + } + + return map[string]int64{ + gssmrIsMajorSyncMetric: issyncing, + } +} diff --git a/dot/network/sync.go b/dot/network/sync.go index 5ff957ec36..8cb33a8598 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -23,6 +23,7 @@ import ( "reflect" "sort" "sync" + "sync/atomic" "time" "github.com/ChainSafe/gossamer/dot/types" @@ -194,6 +195,7 @@ func (q *syncQueue) syncAtHead() { t := time.NewTicker(q.slotDuration * 2) defer t.Stop() + for { select { // sleep for average block time TODO: make this configurable from slot duration @@ -207,10 +209,14 @@ func (q *syncQueue) syncAtHead() { continue } + goal := atomic.LoadInt64(&q.goal) + + logger.Debug("sync at head", "current block", curr.Number.Int64(), "previus block", prev.Number, "goal", goal) // we aren't at the head yet, sleep - if curr.Number.Int64() < q.goal && curr.Number.Cmp(prev.Number) > 0 { + if curr.Number.Int64() < goal && curr.Number.Cmp(prev.Number) > 0 { prev = curr q.s.noGossip = true + q.s.syncer.SetSyncing(true) continue } @@ -247,10 +253,12 @@ func (q *syncQueue) handleResponseQueue() { } q.responseLock.Lock() + goal := atomic.LoadInt64(&q.goal) + if len(q.responses) == 0 { q.responseLock.Unlock() - if len(q.requestCh) == 0 && head.Int64() < q.goal { + if len(q.requestCh) == 0 && head.Int64() < goal { q.pushRequest(uint64(head.Int64()+1), blockRequestBufferSize, "") } continue @@ -328,6 +336,9 @@ func (q *syncQueue) prunePeers() { } func (q *syncQueue) benchmark() { + t := time.NewTimer(time.Second * 5) + defer t.Stop() + for { if q.ctx.Err() != nil { return @@ -338,19 +349,27 @@ func (q *syncQueue) benchmark() { continue } - if before.Number.Int64() >= q.goal { + goal := atomic.LoadInt64(&q.goal) + + if before.Number.Int64() >= goal { finalised, err := q.s.blockState.GetFinalisedHeader(0, 0) //nolint if err != nil { continue } logger.Info("💤 node waiting", "peer count", len(q.s.host.peers()), "head", before.Number, "finalised", finalised.Number) - time.Sleep(time.Second * 5) + + // reset the counter and then wait 5 seconds + t.Reset(time.Second * 5) + <-t.C + continue } q.benchmarker.begin(before.Number.Uint64()) - time.Sleep(time.Second * 5) + + t.Reset(time.Second * 5) + <-t.C after, err := q.s.blockState.BestBlockHeader() if err != nil { @@ -361,7 +380,7 @@ func (q *syncQueue) benchmark() { logger.Info("🚣 currently syncing", "peer count", len(q.s.host.peers()), - "goal", q.goal, + "goal", goal, "average blocks/second", q.benchmarker.mostRecentAverage(), "overall average", q.benchmarker.average(), ) @@ -418,11 +437,13 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { return } - if q.goal < best.Int64() { - q.goal = best.Int64() + goal := atomic.LoadInt64(&q.goal) + if goal < best.Int64() { + atomic.StoreInt64(&q.goal, best.Int64()) } - if q.goal-int64(start) < int64(blockRequestSize) { + goal = atomic.LoadInt64(&q.goal) + if goal-int64(start) < int64(blockRequestSize) { start := best.Int64() + 1 req := createBlockRequest(start, 0) @@ -443,7 +464,7 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) { start = start - m + 1 for i := 0; i < numRequests; i++ { - if start > uint64(q.goal) { + if start > uint64(goal) { return } @@ -833,11 +854,12 @@ func (q *syncQueue) handleBlockAnnounceHandshake(blockNum uint32, from peer.ID) return } - if bestNum.Int64() >= int64(blockNum) || q.goal >= int64(blockNum) { + goal := atomic.LoadInt64(&q.goal) + if bestNum.Int64() >= int64(blockNum) || goal >= int64(blockNum) { return } - q.goal = int64(blockNum) + atomic.StoreInt64(&q.goal, int64(blockNum)) q.pushRequest(uint64(bestNum.Int64()+1), blockRequestBufferSize, from) } @@ -856,8 +878,9 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID) return } - if header.Number.Int64() > q.goal { - q.goal = header.Number.Int64() + goal := atomic.LoadInt64(&q.goal) + if header.Number.Int64() > goal { + atomic.StoreInt64(&q.goal, header.Number.Int64()) } req := createBlockRequestWithHash(header.Hash(), blockRequestSize) diff --git a/dot/node.go b/dot/node.go index 5af09fef07..7a4732cbd5 100644 --- a/dot/node.go +++ b/dot/node.go @@ -333,6 +333,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, c := metrics.NewCollector(context.Background()) c.AddGauge(fg) c.AddGauge(stateSrvc) + c.AddGauge(networkSrvc) go c.Start() From 279293d9a580fa9808b0916ebc01bb398126f8b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 15 Jul 2021 10:47:35 -0400 Subject: [PATCH 2/9] add tests --- dot/network/service_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/dot/network/service_test.go b/dot/network/service_test.go index cbc44f64fd..e3a9358812 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -344,3 +344,21 @@ func TestHandleConn(t *testing.T) { require.True(t, ok) require.Equal(t, 1, aScore) } + +func TestSerivceIsMajorSyncMetrics(t *testing.T) { + mocksyncer := new(MockSyncer) + + node := &Service{ + syncer: mocksyncer, + } + + mocksyncer.On("IsSynced").Return(false).Once() + m := node.CollectGauge() + + require.Equal(t, int64(1), m[gssmrIsMajorSyncMetric]) + + mocksyncer.On("IsSynced").Return(true).Once() + m = node.CollectGauge() + + require.Equal(t, int64(0), m[gssmrIsMajorSyncMetric]) +} From df84a064d9b46b508d8ba7a81d187771a8216e22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 15 Jul 2021 13:53:36 -0400 Subject: [PATCH 3/9] remove debug from polkadot config --- chain/polkadot/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/polkadot/config.toml b/chain/polkadot/config.toml index bd8f03239d..6ec7c85783 100644 --- a/chain/polkadot/config.toml +++ b/chain/polkadot/config.toml @@ -5,7 +5,7 @@ metrics-port = 9876 [log] core = "" -network = "debug" +network = "" rpc = "" state = "" runtime = "" From 8d9aaddf3f4e389edee30d6e119e13dc2a179c5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 15 Jul 2021 13:55:04 -0400 Subject: [PATCH 4/9] chore: add comment --- dot/network/service.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dot/network/service.go b/dot/network/service.go index 9ed03598b9..75f39fc0da 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -677,6 +677,7 @@ func (s *Service) NodeRoles() byte { return s.cfg.Roles } +// CollectGauge will be used to collect coutable metrics from network service func (s *Service) CollectGauge() map[string]int64 { var issyncing int64 if !s.syncer.IsSynced() { From 583e0c3de3e2a2848d7dc36ec2a5f403c16bef3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Thu, 15 Jul 2021 13:56:27 -0400 Subject: [PATCH 5/9] chore: remove comments --- dot/network/sync.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dot/network/sync.go b/dot/network/sync.go index 8cb33a8598..87e286b961 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -211,7 +211,6 @@ func (q *syncQueue) syncAtHead() { goal := atomic.LoadInt64(&q.goal) - logger.Debug("sync at head", "current block", curr.Number.Int64(), "previus block", prev.Number, "goal", goal) // we aren't at the head yet, sleep if curr.Number.Int64() < goal && curr.Number.Cmp(prev.Number) > 0 { prev = curr From 53266cb419d92e58a02838f4cb44c6bd51057a30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20Junior?= Date: Thu, 15 Jul 2021 15:41:49 -0400 Subject: [PATCH 6/9] change var name Co-authored-by: Timothy Wu --- dot/network/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/network/service.go b/dot/network/service.go index 75f39fc0da..c65663da12 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -679,7 +679,7 @@ func (s *Service) NodeRoles() byte { // CollectGauge will be used to collect coutable metrics from network service func (s *Service) CollectGauge() map[string]int64 { - var issyncing int64 + var isSynced int64 if !s.syncer.IsSynced() { issyncing = 1 } else { From 03024be6d9ec75e596d31a42c2ac69377690c2d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20Junior?= Date: Thu, 15 Jul 2021 15:43:17 -0400 Subject: [PATCH 7/9] update var name --- dot/network/service.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dot/network/service.go b/dot/network/service.go index c65663da12..56bf630e66 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -681,12 +681,12 @@ func (s *Service) NodeRoles() byte { func (s *Service) CollectGauge() map[string]int64 { var isSynced int64 if !s.syncer.IsSynced() { - issyncing = 1 + isSynced = 1 } else { - issyncing = 0 + isSynced = 0 } return map[string]int64{ - gssmrIsMajorSyncMetric: issyncing, + gssmrIsMajorSyncMetric: isSynced, } } From 1c425728cbc5a9cc9d050ad0b748cb8410267d16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Fri, 16 Jul 2021 14:07:01 -0400 Subject: [PATCH 8/9] chore: add metrics port --- dot/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dot/config.go b/dot/config.go index e5572e10ab..4b626e9f4b 100644 --- a/dot/config.go +++ b/dot/config.go @@ -238,6 +238,7 @@ func PolkadotConfig() *Config { LogLvl: polkadot.DefaultLvl, RetainBlocks: gssmr.DefaultRetainBlocks, Pruning: pruner.Mode(gssmr.DefaultPruningMode), + MetricsPort: gssmr.DefaultMetricsPort, }, Log: LogConfig{ CoreLvl: polkadot.DefaultLvl, From 09f64b7a3536b9d19a75f31d00a46f9949257069 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20J=C3=BAnior?= Date: Fri, 23 Jul 2021 10:21:24 -0300 Subject: [PATCH 9/9] chore: add clean up to close dbs while testing --- lib/grandpa/grandpa_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/grandpa/grandpa_test.go b/lib/grandpa/grandpa_test.go index ae325d76da..645faeda0e 100644 --- a/lib/grandpa/grandpa_test.go +++ b/lib/grandpa/grandpa_test.go @@ -68,6 +68,8 @@ func newTestState(t *testing.T) *state.Service { db, err := utils.SetupDatabase(testDatadirPath, true) require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + gen, genTrie, _ := genesis.NewTestGenesisWithTrieAndHeader(t) block, err := state.NewBlockStateFromGenesis(db, testHeader) require.NoError(t, err)