From f244fcd0272fd9ee5b35da503931b5a193d8bf42 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Mon, 15 Jan 2024 19:26:54 -0400 Subject: [PATCH 01/10] chore: execute a block announce if nothing arrives at time --- dot/network/light.go | 2 +- dot/network/notifications.go | 3 +- dot/network/request_response.go | 2 +- dot/network/service.go | 38 ++++++++++++ dot/network/stream_manager.go | 2 +- dot/network/sync.go | 2 +- dot/sync/chain_sync.go | 100 +++++++++++++++++++++++--------- dot/sync/interfaces.go | 2 + dot/sync/syncer.go | 2 +- 9 files changed, 121 insertions(+), 32 deletions(-) diff --git a/dot/network/light.go b/dot/network/light.go index 28d0fb3b38..5c5a0726be 100644 --- a/dot/network/light.go +++ b/dot/network/light.go @@ -36,7 +36,7 @@ func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg Message) (err error) { defer func() { err := stream.Close() - if err != nil { + if err != nil && err.Error() != "stream reset" { logger.Warnf("failed to close stream: %s", err) } }() diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 1dcab79157..7efb59947a 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -242,8 +242,9 @@ func closeOutboundStream(info *notificationsProtocol, peerID peer.ID, stream net ) info.peersData.deleteOutboundHandshakeData(peerID) + err := stream.Close() - if err != nil { + if err != nil && err.Error() != "stream reset" { logger.Warnf("failed to close outbound stream: %s", err) } } diff --git a/dot/network/request_response.go b/dot/network/request_response.go index 09956a6d27..3dd20d5443 100644 --- a/dot/network/request_response.go +++ b/dot/network/request_response.go @@ -43,7 +43,7 @@ func (rrp *RequestResponseProtocol) Do(to peer.ID, req Message, res ResponseMess defer func() { err := stream.Close() - if err != nil { + if err != nil && err.Error() != "stream reset" { logger.Warnf("failed to close stream: %s", err) } }() diff --git a/dot/network/service.go b/dot/network/service.go index 42395d98cd..008d6d74c7 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -14,6 +14,7 @@ import ( "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/telemetry" + "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/internal/mdns" "github.com/ChainSafe/gossamer/internal/metrics" @@ -742,3 +743,40 @@ func (s *Service) startProcessingMsg() { } } } + +func (s *Service) BlockAnnounceHandshake(header *types.Header) error { + protocol, ok := s.notificationsProtocols[blockAnnounceMsgType] + if !ok { + panic("block announce message type not found") + } + + handshake, err := protocol.getHandshake() + if err != nil { + return fmt.Errorf("getting handshake: %w", err) + } + + peers := s.host.peers() + + wg := sync.WaitGroup{} + wg.Add(len(peers)) + for _, p := range peers { + protocol.peersData.setMutex(p) + + go func(p peer.ID) { + defer wg.Done() + stream, err := s.sendHandshake(p, handshake, protocol) + if err != nil { + logger.Tracef("while sending block announce handshake: %v", err) + return + } + + response := protocol.peersData.getOutboundHandshakeData(p) + if response.received && response.validated { + closeOutboundStream(protocol, p, stream) + } + }(p) + } + + wg.Wait() + return nil +} diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index b47518c374..6ed330bd66 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -61,7 +61,7 @@ func (sm *streamManager) cleanupStreams() { if time.Since(lastReceived) > cleanupStreamInterval { err := stream.Close() - if err != nil { + if err != nil && err.Error() != "stream reset" { logger.Warnf("failed to close inactive stream: %s", err) } delete(sm.streamData, id) diff --git a/dot/network/sync.go b/dot/network/sync.go index ce96ae9d70..2beabe3593 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -32,7 +32,7 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er defer func() { err := stream.Close() - if err != nil { + if err != nil && err.Error() != "stream reset" { logger.Warnf("failed to close stream: %s", err) } }() diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index eea6301a89..a96e26dcf3 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" "github.com/ChainSafe/gossamer/dot/network" @@ -99,6 +100,54 @@ type announcedBlock struct { header *types.Header } +type peerViewSet struct { + mtx sync.RWMutex + view map[peer.ID]peerView +} + +func (p *peerViewSet) size() int { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return len(p.view) +} + +func (p *peerViewSet) values() []peerView { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return maps.Values(p.view) +} + +func (p *peerViewSet) update(peerID peer.ID, hash common.Hash, number uint) { + p.mtx.Lock() + defer p.mtx.Unlock() + + newView := peerView{ + who: peerID, + hash: hash, + number: number, + } + + view, ok := p.view[peerID] + if !ok { + p.view[peerID] = newView + return + } + + if view.number >= newView.number { + return + } + + p.view[peerID] = newView +} + +func newPeerViewSet(cap int) *peerViewSet { + return &peerViewSet{ + view: make(map[peer.ID]peerView, cap), + } +} + type chainSync struct { wg sync.WaitGroup stopCh chan struct{} @@ -110,8 +159,7 @@ type chainSync struct { // tracks the latest state we know of from our peers, // ie. their best block hash and number - peerViewLock sync.RWMutex - peerView map[peer.ID]peerView + peerViewSet *peerViewSet // disjoint set of blocks which are known but not ready to be processed // ie. we only know the hash, number, or the parent block is unknown, or the body is unknown @@ -166,7 +214,7 @@ func newChainSync(cfg chainSyncConfig) *chainSync { telemetry: cfg.telemetry, blockState: cfg.bs, network: cfg.net, - peerView: make(map[peer.ID]peerView), + peerViewSet: newPeerViewSet(cfg.maxPeers), pendingBlocks: cfg.pendingBlocks, syncMode: atomicState, finalisedCh: cfg.bs.GetFinalisedNotifierChannel(), @@ -194,6 +242,17 @@ func (cs *chainSync) waitEnoughPeersAndTarget() { select { case <-waitPeersTimer.C: waitPeersTimer.Reset(cs.waitPeersDuration) + bestBlockHeader, err := cs.blockState.BestBlockHeader() + if err != nil { + logger.Errorf("failed to get best block header: %v", err) + continue + } + + err = cs.network.BlockAnnounceHandshake(bestBlockHeader) + if err != nil { + logger.Errorf("failed to get handshake peer data: %v", err) + continue + } case <-cs.stopCh: return } @@ -310,13 +369,7 @@ func (cs *chainSync) getSyncMode() chainSyncState { func (cs *chainSync) onBlockAnnounceHandshake(who peer.ID, bestHash common.Hash, bestNumber uint) error { cs.workerPool.fromBlockAnnounce(who) - cs.peerViewLock.Lock() - cs.peerView[who] = peerView{ - who: who, - hash: bestHash, - number: bestNumber, - } - cs.peerViewLock.Unlock() + cs.peerViewSet.update(who, bestHash, bestNumber) if cs.getSyncMode() == bootstrap { return nil @@ -587,21 +640,19 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin // head block number, it would leave us in bootstrap mode forever // it would be better to have some sort of standard deviation calculation and discard any outliers (#1861) func (cs *chainSync) getTarget() (uint, error) { - cs.peerViewLock.RLock() - defer cs.peerViewLock.RUnlock() - + peerSetLen := cs.peerViewSet.size() // in practice, this shouldn't happen, as we only start the module once we have some peer states - if len(cs.peerView) == 0 { + if peerSetLen == 0 { return 0, errNoPeerViews } + numbers := make([]uint, 0, peerSetLen) // we are going to sort the data and remove the outliers then we will return the avg of all the valid elements - uintArr := make([]uint, 0, len(cs.peerView)) - for _, ps := range cs.peerView { - uintArr = append(uintArr, ps.number) + for _, view := range cs.peerViewSet.values() { + numbers = append(numbers, view.number) } - sum, count := nonOutliersSumCount(uintArr) + sum, count := nonOutliersSumCount(numbers) quotientBigInt := big.NewInt(0).Div(sum, big.NewInt(int64(count))) return uint(quotientBigInt.Uint64()), nil } @@ -655,10 +706,10 @@ taskResultLoop: taskResult.who, taskResult.err != nil, taskResult.response != nil) if taskResult.err != nil { - logger.Errorf("task result: peer(%s) error: %s", - taskResult.who, taskResult.err) - if !errors.Is(taskResult.err, network.ErrReceivedEmptyMessage) { + logger.Errorf("task result: peer(%s) error: %s", + taskResult.who, taskResult.err) + if strings.Contains(taskResult.err.Error(), "protocols not supported") { cs.network.ReportPeer(peerset.ReputationChange{ Value: peerset.BadProtocolValue, @@ -1049,14 +1100,11 @@ func doResponseGrowsTheChain(response, ongoingChain []*types.BlockData, startAtB } func (cs *chainSync) getHighestBlock() (highestBlock uint, err error) { - cs.peerViewLock.RLock() - defer cs.peerViewLock.RUnlock() - - if len(cs.peerView) == 0 { + if cs.peerViewSet.size() == 0 { return 0, errNoPeers } - for _, ps := range cs.peerView { + for _, ps := range cs.peerViewSet.values() { if ps.number < highestBlock { continue } diff --git a/dot/sync/interfaces.go b/dot/sync/interfaces.go index bfc0575d3f..89336bf46b 100644 --- a/dot/sync/interfaces.go +++ b/dot/sync/interfaces.go @@ -75,6 +75,8 @@ type Network interface { ReportPeer(change peerset.ReputationChange, p peer.ID) AllConnectedPeersIDs() []peer.ID + + BlockAnnounceHandshake(*types.Header) error } // Telemetry is the telemetry client to send telemetry messages. diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index c511f2644c..ff477fc099 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -64,7 +64,7 @@ func NewService(cfg *Config) (*Service, error) { telemetry: cfg.Telemetry, badBlocks: cfg.BadBlocks, requestMaker: cfg.RequestMaker, - waitPeersDuration: 100 * time.Millisecond, + waitPeersDuration: 7 * time.Second, } chainSync := newChainSync(csCfg) From 46b3cebee0c6c4cf8fc8633390022ac1851da957 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 16 Jan 2024 09:53:26 -0400 Subject: [PATCH 02/10] chore: update mocks and fix broken tests --- dot/sync/chain_sync.go | 8 ++++++++ dot/sync/chain_sync_test.go | 26 ++++++++++++++------------ dot/sync/mocks_test.go | 14 ++++++++++++++ 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index a96e26dcf3..1601ff0ed8 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -105,6 +105,14 @@ type peerViewSet struct { view map[peer.ID]peerView } +func (p *peerViewSet) find(pID peer.ID) (view peerView, ok bool) { + p.mtx.RLock() + defer p.mtx.RUnlock() + + view, ok = p.view[pID] + return view, ok +} + func (p *peerViewSet) size() int { p.mtx.RLock() defer p.mtx.RUnlock() diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index f4fb045074..67e1b6c9ea 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -319,7 +319,7 @@ func Test_chainSync_onBlockAnnounceHandshake_tipModeNeedToCatchup(t *testing.T) chainSync := &chainSync{ stopCh: stopCh, - peerView: make(map[peer.ID]peerView), + peerViewSet: newPeerViewSet(10), syncMode: state, pendingBlocks: newDisjointBlockSet(0), workerPool: newSyncWorkerPool(networkMock, requestMaker), @@ -417,7 +417,7 @@ func TestChainSync_onBlockAnnounceHandshake_onBootstrapMode(t *testing.T) { cs := tt.newChainSync(t, ctrl) cs.onBlockAnnounceHandshake(tt.peerID, tt.bestHash, tt.bestNumber) - view, exists := cs.peerView[tt.peerID] + view, exists := cs.peerViewSet.find(tt.peerID) require.True(t, exists) require.Equal(t, tt.peerID, view.who) require.Equal(t, tt.bestHash, view.hash) @@ -486,7 +486,7 @@ func setupChainSyncToBootstrapMode(t *testing.T, blocksAhead uint, } chainSync := newChainSync(cfg) - chainSync.peerView = peerViewMap + chainSync.peerViewSet = &peerViewSet{view: peerViewMap} chainSync.syncMode.Store(bootstrap) return chainSync @@ -1641,21 +1641,23 @@ func TestChainSync_getHighestBlock(t *testing.T) { cases := map[string]struct { expectedHighestBlock uint wantErr error - chainSyncPeerView map[peer.ID]peerView + chainSyncPeerViewSet *peerViewSet }{ "no_peer_view": { wantErr: errNoPeers, expectedHighestBlock: 0, - chainSyncPeerView: make(map[peer.ID]peerView), + chainSyncPeerViewSet: newPeerViewSet(10), }, "highest_block": { expectedHighestBlock: 500, - chainSyncPeerView: map[peer.ID]peerView{ - peer.ID("peer-A"): { - number: 100, - }, - peer.ID("peer-B"): { - number: 500, + chainSyncPeerViewSet: &peerViewSet{ + view: map[peer.ID]peerView{ + peer.ID("peer-A"): { + number: 100, + }, + peer.ID("peer-B"): { + number: 500, + }, }, }, }, @@ -1667,7 +1669,7 @@ func TestChainSync_getHighestBlock(t *testing.T) { t.Parallel() chainSync := &chainSync{ - peerView: tt.chainSyncPeerView, + peerViewSet: tt.chainSyncPeerViewSet, } highestBlock, err := chainSync.getHighestBlock() diff --git a/dot/sync/mocks_test.go b/dot/sync/mocks_test.go index 9ab3ea3acb..8bc7f13977 100644 --- a/dot/sync/mocks_test.go +++ b/dot/sync/mocks_test.go @@ -597,6 +597,20 @@ func (mr *MockNetworkMockRecorder) AllConnectedPeersIDs() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllConnectedPeersIDs", reflect.TypeOf((*MockNetwork)(nil).AllConnectedPeersIDs)) } +// BlockAnnounceHandshake mocks base method. +func (m *MockNetwork) BlockAnnounceHandshake(arg0 *types.Header) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BlockAnnounceHandshake", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// BlockAnnounceHandshake indicates an expected call of BlockAnnounceHandshake. +func (mr *MockNetworkMockRecorder) BlockAnnounceHandshake(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockAnnounceHandshake", reflect.TypeOf((*MockNetwork)(nil).BlockAnnounceHandshake), arg0) +} + // Peers mocks base method. func (m *MockNetwork) Peers() []common.PeerInfo { m.ctrl.T.Helper() From 746d6adb80a0f6f935b92db4b8fae671f53edc89 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 16 Jan 2024 10:18:48 -0400 Subject: [PATCH 03/10] chore: moving peerset view to its own file and introduce `getTarget` method --- dot/network/errors.go | 1 + dot/network/light.go | 2 +- dot/network/notifications.go | 2 +- dot/network/request_response.go | 2 +- dot/network/stream_manager.go | 2 +- dot/network/sync.go | 2 +- dot/sync/chain_sync.go | 99 ++------------------------------- dot/sync/chain_sync_test.go | 46 +++++++-------- dot/sync/peer_view.go | 92 ++++++++++++++++++++++++++++++ 9 files changed, 122 insertions(+), 126 deletions(-) create mode 100644 dot/sync/peer_view.go diff --git a/dot/network/errors.go b/dot/network/errors.go index 640f15c4ba..ab58f64929 100644 --- a/dot/network/errors.go +++ b/dot/network/errors.go @@ -22,4 +22,5 @@ var ( ErrNilStream = errors.New("nil stream") ErrInvalidLEB128EncodedData = errors.New("invalid LEB128 encoded data") ErrGreaterThanMaxSize = errors.New("greater than maximum size") + ErrStreamReset = errors.New("stream reset") ) diff --git a/dot/network/light.go b/dot/network/light.go index 5c5a0726be..0943cb823d 100644 --- a/dot/network/light.go +++ b/dot/network/light.go @@ -36,7 +36,7 @@ func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg Message) (err error) { defer func() { err := stream.Close() - if err != nil && err.Error() != "stream reset" { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close stream: %s", err) } }() diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 7efb59947a..dac2970be6 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -244,7 +244,7 @@ func closeOutboundStream(info *notificationsProtocol, peerID peer.ID, stream net info.peersData.deleteOutboundHandshakeData(peerID) err := stream.Close() - if err != nil && err.Error() != "stream reset" { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close outbound stream: %s", err) } } diff --git a/dot/network/request_response.go b/dot/network/request_response.go index 3dd20d5443..3a3b732cf1 100644 --- a/dot/network/request_response.go +++ b/dot/network/request_response.go @@ -43,7 +43,7 @@ func (rrp *RequestResponseProtocol) Do(to peer.ID, req Message, res ResponseMess defer func() { err := stream.Close() - if err != nil && err.Error() != "stream reset" { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close stream: %s", err) } }() diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index 6ed330bd66..c4554c10d6 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -61,7 +61,7 @@ func (sm *streamManager) cleanupStreams() { if time.Since(lastReceived) > cleanupStreamInterval { err := stream.Close() - if err != nil && err.Error() != "stream reset" { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close inactive stream: %s", err) } delete(sm.streamData, id) diff --git a/dot/network/sync.go b/dot/network/sync.go index 2beabe3593..c8d3e7f432 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -32,7 +32,7 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er defer func() { err := stream.Close() - if err != nil && err.Error() != "stream reset" { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close stream: %s", err) } }() diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index 1601ff0ed8..853453f896 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -7,7 +7,6 @@ import ( "bytes" "errors" "fmt" - "math/big" "strings" "sync" "sync/atomic" @@ -16,7 +15,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "golang.org/x/exp/maps" "golang.org/x/exp/slices" "github.com/ChainSafe/gossamer/dot/network" @@ -99,63 +97,6 @@ type announcedBlock struct { who peer.ID header *types.Header } - -type peerViewSet struct { - mtx sync.RWMutex - view map[peer.ID]peerView -} - -func (p *peerViewSet) find(pID peer.ID) (view peerView, ok bool) { - p.mtx.RLock() - defer p.mtx.RUnlock() - - view, ok = p.view[pID] - return view, ok -} - -func (p *peerViewSet) size() int { - p.mtx.RLock() - defer p.mtx.RUnlock() - - return len(p.view) -} - -func (p *peerViewSet) values() []peerView { - p.mtx.RLock() - defer p.mtx.RUnlock() - - return maps.Values(p.view) -} - -func (p *peerViewSet) update(peerID peer.ID, hash common.Hash, number uint) { - p.mtx.Lock() - defer p.mtx.Unlock() - - newView := peerView{ - who: peerID, - hash: hash, - number: number, - } - - view, ok := p.view[peerID] - if !ok { - p.view[peerID] = newView - return - } - - if view.number >= newView.number { - return - } - - p.view[peerID] = newView -} - -func newPeerViewSet(cap int) *peerViewSet { - return &peerViewSet{ - view: make(map[peer.ID]peerView, cap), - } -} - type chainSync struct { wg sync.WaitGroup stopCh chan struct{} @@ -240,10 +181,10 @@ func (cs *chainSync) waitEnoughPeersAndTarget() { for { cs.workerPool.useConnectedPeers() - _, err := cs.getTarget() + target := cs.peerViewSet.getTarget() totalAvailable := cs.workerPool.totalWorkers() - if totalAvailable >= uint(cs.minPeers) && err == nil { + if totalAvailable >= uint(cs.minPeers) && target > 0 { return } @@ -306,11 +247,6 @@ func (cs *chainSync) stop() error { func (cs *chainSync) isBootstrap() (bestBlockHeader *types.Header, syncTarget uint, isBootstrap bool, err error) { - syncTarget, err = cs.getTarget() - if err != nil { - return nil, syncTarget, false, fmt.Errorf("getting target: %w", err) - } - bestBlockHeader, err = cs.blockState.BestBlockHeader() if err != nil { return nil, syncTarget, false, fmt.Errorf("getting best block header: %w", err) @@ -318,7 +254,7 @@ func (cs *chainSync) isBootstrap() (bestBlockHeader *types.Header, syncTarget ui bestBlockNumber := bestBlockHeader.Number isBootstrap = bestBlockNumber+network.MaxBlocksInResponse < syncTarget - return bestBlockHeader, syncTarget, isBootstrap, nil + return bestBlockHeader, cs.peerViewSet.getTarget(), isBootstrap, nil } func (cs *chainSync) bootstrapSync() { @@ -615,10 +551,7 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin // we should bound it to the real target which is collected through // block announces received from other peers targetBlockNumber := startRequestAt + maxRequestsAllowed*128 - realTarget, err := cs.getTarget() - if err != nil { - return fmt.Errorf("while getting target: %w", err) - } + realTarget := cs.peerViewSet.getTarget() if targetBlockNumber > realTarget { targetBlockNumber = realTarget @@ -635,7 +568,7 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin } resultsQueue := cs.workerPool.submitRequests(requests) - err = cs.handleWorkersResults(resultsQueue, origin, startRequestAt, expectedAmountOfBlocks) + err := cs.handleWorkersResults(resultsQueue, origin, startRequestAt, expectedAmountOfBlocks) if err != nil { return fmt.Errorf("while handling workers results: %w", err) } @@ -643,28 +576,6 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin return nil } -// getTarget takes the average of all peer heads -// TODO: should we just return the highest? could be an attack vector potentially, if a peer reports some very large -// head block number, it would leave us in bootstrap mode forever -// it would be better to have some sort of standard deviation calculation and discard any outliers (#1861) -func (cs *chainSync) getTarget() (uint, error) { - peerSetLen := cs.peerViewSet.size() - // in practice, this shouldn't happen, as we only start the module once we have some peer states - if peerSetLen == 0 { - return 0, errNoPeerViews - } - - numbers := make([]uint, 0, peerSetLen) - // we are going to sort the data and remove the outliers then we will return the avg of all the valid elements - for _, view := range cs.peerViewSet.values() { - numbers = append(numbers, view.number) - } - - sum, count := nonOutliersSumCount(numbers) - quotientBigInt := big.NewInt(0).Div(sum, big.NewInt(int64(count))) - return uint(quotientBigInt.Uint64()), nil -} - // handleWorkersResults, every time we submit requests to workers they results should be computed here // and every cicle we should endup with a complete chain, whenever we identify // any error from a worker we should evaluate the error and re-insert the request diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index 67e1b6c9ea..06433f2321 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -546,8 +546,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorker(t *testing.T) { mockedBlockState, mockedNetwork, mockedRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(128), target) // include a new worker in the worker pool set, this worker @@ -555,7 +554,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorker(t *testing.T) { // the worker pool executes the workers management cs.workerPool.fromBlockAnnounce(peer.ID("noot")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -631,8 +630,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithTwoWorkers(t *testing.T) { mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -641,7 +639,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithTwoWorkers(t *testing.T) { cs.workerPool.fromBlockAnnounce(peer.ID("noot")) cs.workerPool.fromBlockAnnounce(peer.ID("noot2")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -725,8 +723,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorkerFailing(t *testing. mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -735,7 +732,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorkerFailing(t *testing. cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -825,8 +822,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithProtocolNotSupported(t *test mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -835,7 +831,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithProtocolNotSupported(t *test cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -927,8 +923,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithNilHeaderInResponse(t *testi mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -937,7 +932,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithNilHeaderInResponse(t *testi cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -1025,8 +1020,8 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithResponseIsNotAChain(t *testi mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() + require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -1035,7 +1030,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithResponseIsNotAChain(t *testi cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -1139,8 +1134,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithReceivedBadBlock(t *testing. cs.badBlocks = []string{fakeBadBlockHash.String()} - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -1149,7 +1143,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithReceivedBadBlock(t *testing. cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -1225,13 +1219,12 @@ func TestChainSync_BootstrapSync_SucessfulSync_ReceivedPartialBlockData(t *testi mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) cs.workerPool.fromBlockAnnounce(peer.ID("alice")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -1749,8 +1742,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithInvalidJusticationBlock(t *t cs.finalityGadget = mockFinalityGadget - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -1759,7 +1751,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithInvalidJusticationBlock(t *t cs.workerPool.fromBlockAnnounce(peer.ID("alice")) //cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.ErrorIs(t, err, errVerifyBlockJustification) err = cs.workerPool.stop() diff --git a/dot/sync/peer_view.go b/dot/sync/peer_view.go new file mode 100644 index 0000000000..8fad4146ae --- /dev/null +++ b/dot/sync/peer_view.go @@ -0,0 +1,92 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package sync + +import ( + "math/big" + "sync" + + "github.com/ChainSafe/gossamer/lib/common" + "github.com/libp2p/go-libp2p/core/peer" + "golang.org/x/exp/maps" +) + +type peerViewSet struct { + mtx sync.RWMutex + view map[peer.ID]peerView + target uint +} + +// getTarget takes the average of all peer views best number +func (p *peerViewSet) getTarget() uint { + p.mtx.RLock() + defer p.mtx.RUnlock() + + numbers := make([]uint, 0, len(p.view)) + // we are going to sort the data and remove the outliers then we will return the avg of all the valid elements + for _, view := range maps.Values(p.view) { + numbers = append(numbers, view.number) + } + + sum, count := nonOutliersSumCount(numbers) + quotientBigInt := uint(big.NewInt(0).Div(sum, big.NewInt(int64(count))).Uint64()) + + if p.target >= quotientBigInt { + return p.target + } + + p.target = quotientBigInt // cache latest calculated target + return p.target +} + +func (p *peerViewSet) find(pID peer.ID) (view peerView, ok bool) { + p.mtx.RLock() + defer p.mtx.RUnlock() + + view, ok = p.view[pID] + return view, ok +} + +func (p *peerViewSet) size() int { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return len(p.view) +} + +func (p *peerViewSet) values() []peerView { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return maps.Values(p.view) +} + +func (p *peerViewSet) update(peerID peer.ID, hash common.Hash, number uint) { + p.mtx.Lock() + defer p.mtx.Unlock() + + newView := peerView{ + who: peerID, + hash: hash, + number: number, + } + + view, ok := p.view[peerID] + if !ok { + p.view[peerID] = newView + return + } + + if view.number >= newView.number { + return + } + + p.view[peerID] = newView +} + +func newPeerViewSet(cap int) *peerViewSet { + return &peerViewSet{ + view: make(map[peer.ID]peerView, cap), + } +} From f43b9de2f33783fc89ecfb9ea3e7d0b58b5a2075 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 16 Jan 2024 10:21:51 -0400 Subject: [PATCH 04/10] chore: make `peersetview.update` simpler --- dot/sync/peer_view.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/dot/sync/peer_view.go b/dot/sync/peer_view.go index 8fad4146ae..3e654b5927 100644 --- a/dot/sync/peer_view.go +++ b/dot/sync/peer_view.go @@ -73,12 +73,7 @@ func (p *peerViewSet) update(peerID peer.ID, hash common.Hash, number uint) { } view, ok := p.view[peerID] - if !ok { - p.view[peerID] = newView - return - } - - if view.number >= newView.number { + if ok && view.number >= newView.number { return } From 92cb283dd664f7e884af2d64000f0a9e6eee39f1 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 16 Jan 2024 11:58:44 -0400 Subject: [PATCH 05/10] chore: dont wait if there is bootnodes available --- dot/network/errors.go | 1 + dot/network/service.go | 7 +++++-- dot/sync/chain_sync.go | 41 +++++++++++++++++++---------------------- dot/sync/peer_view.go | 7 +++++++ dot/sync/syncer.go | 2 +- 5 files changed, 33 insertions(+), 25 deletions(-) diff --git a/dot/network/errors.go b/dot/network/errors.go index ab58f64929..b3121c69a6 100644 --- a/dot/network/errors.go +++ b/dot/network/errors.go @@ -8,6 +8,7 @@ import ( ) var ( + ErrNoPeersConnected = errors.New("no peers connected") ErrReceivedEmptyMessage = errors.New("received empty message") errCannotValidateHandshake = errors.New("failed to validate handshake") diff --git a/dot/network/service.go b/dot/network/service.go index 008d6d74c7..151c0c5ebb 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -745,6 +745,11 @@ func (s *Service) startProcessingMsg() { } func (s *Service) BlockAnnounceHandshake(header *types.Header) error { + peers := s.host.peers() + if len(peers) < 1 { + return ErrNoPeersConnected + } + protocol, ok := s.notificationsProtocols[blockAnnounceMsgType] if !ok { panic("block announce message type not found") @@ -755,8 +760,6 @@ func (s *Service) BlockAnnounceHandshake(header *types.Header) error { return fmt.Errorf("getting handshake: %w", err) } - peers := s.host.peers() - wg := sync.WaitGroup{} wg.Add(len(peers)) for _, p := range peers { diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index 853453f896..7a5c5aaa9e 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -69,13 +69,6 @@ var ( }) ) -// peerView tracks our peers's best reported blocks -type peerView struct { - who peer.ID - hash common.Hash - number uint -} - // ChainSync contains the methods used by the high-level service into the `chainSync` module type ChainSync interface { start() @@ -176,32 +169,31 @@ func newChainSync(cfg chainSyncConfig) *chainSync { } } -func (cs *chainSync) waitEnoughPeersAndTarget() { +func (cs *chainSync) retrieveTargetFromBootnodes() error { + highestFinalizedHeader, err := cs.blockState.GetHighestFinalisedHeader() + if err != nil { + return err + } + + return cs.network.BlockAnnounceHandshake(highestFinalizedHeader) +} + +func (cs *chainSync) waitWorkersAndTarget() { waitPeersTimer := time.NewTimer(cs.waitPeersDuration) for { cs.workerPool.useConnectedPeers() - target := cs.peerViewSet.getTarget() - totalAvailable := cs.workerPool.totalWorkers() - if totalAvailable >= uint(cs.minPeers) && target > 0 { + + if totalAvailable >= uint(cs.minPeers) && + cs.peerViewSet.getTarget() > 0 { return } select { case <-waitPeersTimer.C: waitPeersTimer.Reset(cs.waitPeersDuration) - bestBlockHeader, err := cs.blockState.BestBlockHeader() - if err != nil { - logger.Errorf("failed to get best block header: %v", err) - continue - } - err = cs.network.BlockAnnounceHandshake(bestBlockHeader) - if err != nil { - logger.Errorf("failed to get handshake peer data: %v", err) - continue - } case <-cs.stopCh: return } @@ -215,8 +207,13 @@ func (cs *chainSync) start() { cs.wg.Add(1) go cs.pendingBlocks.run(cs.finalisedCh, cs.stopCh, &cs.wg) + err := cs.retrieveTargetFromBootnodes() + if err != nil { + logger.Errorf("failed retrieve target from bootnodes: %v", err) + } + // wait until we have a minimal workers in the sync worker pool - cs.waitEnoughPeersAndTarget() + cs.waitWorkersAndTarget() } func (cs *chainSync) stop() error { diff --git a/dot/sync/peer_view.go b/dot/sync/peer_view.go index 3e654b5927..19c6c3d398 100644 --- a/dot/sync/peer_view.go +++ b/dot/sync/peer_view.go @@ -12,6 +12,13 @@ import ( "golang.org/x/exp/maps" ) +// peerView tracks our peers's best reported blocks +type peerView struct { + who peer.ID + hash common.Hash + number uint +} + type peerViewSet struct { mtx sync.RWMutex view map[peer.ID]peerView diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index ff477fc099..c511f2644c 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -64,7 +64,7 @@ func NewService(cfg *Config) (*Service, error) { telemetry: cfg.Telemetry, badBlocks: cfg.BadBlocks, requestMaker: cfg.RequestMaker, - waitPeersDuration: 7 * time.Second, + waitPeersDuration: 100 * time.Millisecond, } chainSync := newChainSync(csCfg) From 06b8aae2105a9f573b214b4ebe5f493665522b51 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 16 Jan 2024 15:13:04 -0400 Subject: [PATCH 06/10] chore: fix broken tests --- dot/sync/chain_sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index 7a5c5aaa9e..ba99406f4a 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -249,9 +249,10 @@ func (cs *chainSync) isBootstrap() (bestBlockHeader *types.Header, syncTarget ui return nil, syncTarget, false, fmt.Errorf("getting best block header: %w", err) } + syncTarget = cs.peerViewSet.getTarget() bestBlockNumber := bestBlockHeader.Number isBootstrap = bestBlockNumber+network.MaxBlocksInResponse < syncTarget - return bestBlockHeader, cs.peerViewSet.getTarget(), isBootstrap, nil + return bestBlockHeader, syncTarget, isBootstrap, nil } func (cs *chainSync) bootstrapSync() { @@ -309,7 +310,6 @@ func (cs *chainSync) getSyncMode() chainSyncState { // onBlockAnnounceHandshake sets a peer's best known block func (cs *chainSync) onBlockAnnounceHandshake(who peer.ID, bestHash common.Hash, bestNumber uint) error { cs.workerPool.fromBlockAnnounce(who) - cs.peerViewSet.update(who, bestHash, bestNumber) if cs.getSyncMode() == bootstrap { From d972609fa07bf226d4a116149562c2984ed4c1e9 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 16 Jan 2024 16:03:38 -0400 Subject: [PATCH 07/10] chore: fix broken tests, rename `isBootstrap` to `currentSyncInformations` --- dot/sync/chain_sync.go | 34 +++++------ dot/sync/chain_sync_test.go | 110 +++++++++++++++++++----------------- dot/sync/errors.go | 1 - dot/sync/peer_view.go | 4 ++ 4 files changed, 75 insertions(+), 74 deletions(-) diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index ba99406f4a..0deaedb441 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -242,7 +242,7 @@ func (cs *chainSync) stop() error { } } -func (cs *chainSync) isBootstrap() (bestBlockHeader *types.Header, syncTarget uint, +func (cs *chainSync) currentSyncInformations() (bestBlockHeader *types.Header, syncTarget uint, isBootstrap bool, err error) { bestBlockHeader, err = cs.blockState.BestBlockHeader() if err != nil { @@ -266,9 +266,9 @@ func (cs *chainSync) bootstrapSync() { default: } - bestBlockHeader, syncTarget, isFarFromTarget, err := cs.isBootstrap() - if err != nil && !errors.Is(err, errNoPeerViews) { - logger.Criticalf("ending bootstrap sync, checking target distance: %s", err) + bestBlockHeader, syncTarget, isBootstrap, err := cs.currentSyncInformations() + if err != nil { + logger.Criticalf("ending bootstrap sync, getting current sync info: %s", err) return } @@ -287,7 +287,7 @@ func (cs *chainSync) bootstrapSync() { cs.workerPool.totalWorkers(), syncTarget, finalisedHeader.Number, finalisedHeader.Hash()) - if isFarFromTarget { + if isBootstrap { cs.workerPool.useConnectedPeers() err = cs.requestMaxBlocksFrom(bestBlockHeader, networkInitialSync) if err != nil { @@ -316,12 +316,12 @@ func (cs *chainSync) onBlockAnnounceHandshake(who peer.ID, bestHash common.Hash, return nil } - _, _, isFarFromTarget, err := cs.isBootstrap() - if err != nil && !errors.Is(err, errNoPeerViews) { - return fmt.Errorf("checking target distance: %w", err) + _, _, isBootstrap, err := cs.currentSyncInformations() + if err != nil { + return fmt.Errorf("getting current sync info: %w", err) } - if !isFarFromTarget { + if !isBootstrap { return nil } @@ -338,7 +338,6 @@ func (cs *chainSync) onBlockAnnounceHandshake(who peer.ID, bestHash common.Hash, func (cs *chainSync) onBlockAnnounce(announced announcedBlock) error { // TODO: https://github.com/ChainSafe/gossamer/issues/3432 cs.workerPool.fromBlockAnnounce(announced.who) - if cs.pendingBlocks.hasBlock(announced.header.Hash()) { return fmt.Errorf("%w: block %s (#%d)", errAlreadyInDisjointSet, announced.header.Hash(), announced.header.Number) @@ -353,19 +352,19 @@ func (cs *chainSync) onBlockAnnounce(announced announcedBlock) error { return nil } - _, _, isFarFromTarget, err := cs.isBootstrap() - if err != nil && !errors.Is(err, errNoPeerViews) { - return fmt.Errorf("checking target distance: %w", err) + bestBlockHeader, _, isFarFromTarget, err := cs.currentSyncInformations() + if err != nil { + return fmt.Errorf("getting current sync info: %w", err) } if !isFarFromTarget { - return cs.requestAnnouncedBlock(announced) + return cs.requestAnnouncedBlock(bestBlockHeader, announced) } return nil } -func (cs *chainSync) requestAnnouncedBlock(announce announcedBlock) error { +func (cs *chainSync) requestAnnouncedBlock(bestBlockHeader *types.Header, announce announcedBlock) error { peerWhoAnnounced := announce.who announcedHash := announce.header.Hash() announcedNumber := announce.header.Number @@ -379,11 +378,6 @@ func (cs *chainSync) requestAnnouncedBlock(announce announcedBlock) error { return nil } - bestBlockHeader, err := cs.blockState.BestBlockHeader() - if err != nil { - return fmt.Errorf("getting best block header: %w", err) - } - highestFinalizedHeader, err := cs.blockState.GetHighestFinalisedHeader() if err != nil { return fmt.Errorf("getting highest finalized header") diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index 359022279e..36f18c35a8 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -63,7 +63,7 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { t.Parallel() const somePeer = peer.ID("abc") - errTest := errors.New("test error") + //errTest := errors.New("test error") emptyTrieState := storage.NewTrieState(trie.NewEmptyTrie()) block1AnnounceHeader := types.NewHeader(common.Hash{}, emptyTrieState.MustRoot(trie.NoMaxInlineValueSize), common.Hash{}, 1, scale.VaryingDataTypeSlice{}) @@ -80,58 +80,61 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { errMessage string expectedSyncMode chainSyncState }{ - "announced_block_already_exists_in_disjoint_set": { - chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { - pendingBlocks := NewMockDisjointBlockSet(ctrl) - pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(true) - return &chainSync{ - stopCh: make(chan struct{}), - pendingBlocks: pendingBlocks, - workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), - } - }, - peerID: somePeer, - blockAnnounceHeader: block2AnnounceHeader, - errWrapped: errAlreadyInDisjointSet, - errMessage: fmt.Sprintf("already in disjoint set: block %s (#%d)", - block2AnnounceHeader.Hash(), block2AnnounceHeader.Number), - }, - "failed_to_add_announced_block_in_disjoint_set": { - chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { - pendingBlocks := NewMockDisjointBlockSet(ctrl) - pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(false) - pendingBlocks.EXPECT().addHeader(block2AnnounceHeader).Return(errTest) - - return &chainSync{ - stopCh: make(chan struct{}), - pendingBlocks: pendingBlocks, - workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), - } - }, - peerID: somePeer, - blockAnnounceHeader: block2AnnounceHeader, - errWrapped: errTest, - errMessage: "while adding pending block header: test error", - }, - "announced_block_while_in_bootstrap_mode": { - chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { - pendingBlocks := NewMockDisjointBlockSet(ctrl) - pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(false) - pendingBlocks.EXPECT().addHeader(block2AnnounceHeader).Return(nil) - - state := atomic.Value{} - state.Store(bootstrap) - - return &chainSync{ - stopCh: make(chan struct{}), - pendingBlocks: pendingBlocks, - syncMode: state, - workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), - } - }, - peerID: somePeer, - blockAnnounceHeader: block2AnnounceHeader, - }, + // "announced_block_already_exists_in_disjoint_set": { + // chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { + // pendingBlocks := NewMockDisjointBlockSet(ctrl) + // pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(true) + // return &chainSync{ + // stopCh: make(chan struct{}), + // pendingBlocks: pendingBlocks, + // peerViewSet: newPeerViewSet(0), + // workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), + // } + // }, + // peerID: somePeer, + // blockAnnounceHeader: block2AnnounceHeader, + // errWrapped: errAlreadyInDisjointSet, + // errMessage: fmt.Sprintf("already in disjoint set: block %s (#%d)", + // block2AnnounceHeader.Hash(), block2AnnounceHeader.Number), + // }, + // "failed_to_add_announced_block_in_disjoint_set": { + // chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { + // pendingBlocks := NewMockDisjointBlockSet(ctrl) + // pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(false) + // pendingBlocks.EXPECT().addHeader(block2AnnounceHeader).Return(errTest) + + // return &chainSync{ + // stopCh: make(chan struct{}), + // pendingBlocks: pendingBlocks, + // peerViewSet: newPeerViewSet(0), + // workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), + // } + // }, + // peerID: somePeer, + // blockAnnounceHeader: block2AnnounceHeader, + // errWrapped: errTest, + // errMessage: "while adding pending block header: test error", + // }, + // "announced_block_while_in_bootstrap_mode": { + // chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { + // pendingBlocks := NewMockDisjointBlockSet(ctrl) + // pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(false) + // pendingBlocks.EXPECT().addHeader(block2AnnounceHeader).Return(nil) + + // state := atomic.Value{} + // state.Store(bootstrap) + + // return &chainSync{ + // stopCh: make(chan struct{}), + // pendingBlocks: pendingBlocks, + // syncMode: state, + // peerViewSet: newPeerViewSet(0), + // workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), + // } + // }, + // peerID: somePeer, + // blockAnnounceHeader: block2AnnounceHeader, + // }, "announced_block_while_in_tip_mode": { chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { pendingBlocksMock := NewMockDisjointBlockSet(ctrl) @@ -205,6 +208,7 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { telemetry: telemetryMock, storageState: storageStateMock, blockImportHandler: importHandlerMock, + peerViewSet: newPeerViewSet(0), } }, peerID: somePeer, diff --git a/dot/sync/errors.go b/dot/sync/errors.go index 564c878422..08f89cacba 100644 --- a/dot/sync/errors.go +++ b/dot/sync/errors.go @@ -20,7 +20,6 @@ var ( errRequestStartTooHigh = errors.New("request start number is higher than our best block") // chainSync errors - errNoPeerViews = errors.New("unable to get target") errNilBlockData = errors.New("block data is nil") errNilHeaderInResponse = errors.New("expected header, received none") errNilBodyInResponse = errors.New("expected body, received none") diff --git a/dot/sync/peer_view.go b/dot/sync/peer_view.go index 19c6c3d398..3a06122555 100644 --- a/dot/sync/peer_view.go +++ b/dot/sync/peer_view.go @@ -30,6 +30,10 @@ func (p *peerViewSet) getTarget() uint { p.mtx.RLock() defer p.mtx.RUnlock() + if len(p.view) == 0 { + return p.target + } + numbers := make([]uint, 0, len(p.view)) // we are going to sort the data and remove the outliers then we will return the avg of all the valid elements for _, view := range maps.Values(p.view) { From 4d2458d75a1ae3dfd0a9159ff09409ff4c53c42b Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 16 Jan 2024 16:55:02 -0400 Subject: [PATCH 08/10] chore: fix bootnodes handshake --- dot/sync/chain_sync.go | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index 0deaedb441..17a6a587ad 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -169,18 +169,14 @@ func newChainSync(cfg chainSyncConfig) *chainSync { } } -func (cs *chainSync) retrieveTargetFromBootnodes() error { +func (cs *chainSync) waitWorkersAndTarget() { + waitPeersTimer := time.NewTimer(cs.waitPeersDuration) + highestFinalizedHeader, err := cs.blockState.GetHighestFinalisedHeader() if err != nil { - return err + panic(fmt.Sprintf("failed to get highest finalised header: %v", err)) } - return cs.network.BlockAnnounceHandshake(highestFinalizedHeader) -} - -func (cs *chainSync) waitWorkersAndTarget() { - waitPeersTimer := time.NewTimer(cs.waitPeersDuration) - for { cs.workerPool.useConnectedPeers() totalAvailable := cs.workerPool.totalWorkers() @@ -190,6 +186,11 @@ func (cs *chainSync) waitWorkersAndTarget() { return } + err := cs.network.BlockAnnounceHandshake(highestFinalizedHeader) + if err != nil && !errors.Is(err, network.ErrNoPeersConnected) { + logger.Errorf("retrieving target info from peers: %v", err) + } + select { case <-waitPeersTimer.C: waitPeersTimer.Reset(cs.waitPeersDuration) @@ -207,11 +208,6 @@ func (cs *chainSync) start() { cs.wg.Add(1) go cs.pendingBlocks.run(cs.finalisedCh, cs.stopCh, &cs.wg) - err := cs.retrieveTargetFromBootnodes() - if err != nil { - logger.Errorf("failed retrieve target from bootnodes: %v", err) - } - // wait until we have a minimal workers in the sync worker pool cs.waitWorkersAndTarget() } From cd0a4e481e58d992ffde1fd96bd4c1125643af1d Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Wed, 17 Jan 2024 10:45:41 -0400 Subject: [PATCH 09/10] chore: uncomment test cases --- dot/sync/chain_sync_test.go | 112 ++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 56 deletions(-) diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index 36f18c35a8..d99afe8db6 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -63,7 +63,7 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { t.Parallel() const somePeer = peer.ID("abc") - //errTest := errors.New("test error") + errTest := errors.New("test error") emptyTrieState := storage.NewTrieState(trie.NewEmptyTrie()) block1AnnounceHeader := types.NewHeader(common.Hash{}, emptyTrieState.MustRoot(trie.NoMaxInlineValueSize), common.Hash{}, 1, scale.VaryingDataTypeSlice{}) @@ -80,61 +80,61 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { errMessage string expectedSyncMode chainSyncState }{ - // "announced_block_already_exists_in_disjoint_set": { - // chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { - // pendingBlocks := NewMockDisjointBlockSet(ctrl) - // pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(true) - // return &chainSync{ - // stopCh: make(chan struct{}), - // pendingBlocks: pendingBlocks, - // peerViewSet: newPeerViewSet(0), - // workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), - // } - // }, - // peerID: somePeer, - // blockAnnounceHeader: block2AnnounceHeader, - // errWrapped: errAlreadyInDisjointSet, - // errMessage: fmt.Sprintf("already in disjoint set: block %s (#%d)", - // block2AnnounceHeader.Hash(), block2AnnounceHeader.Number), - // }, - // "failed_to_add_announced_block_in_disjoint_set": { - // chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { - // pendingBlocks := NewMockDisjointBlockSet(ctrl) - // pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(false) - // pendingBlocks.EXPECT().addHeader(block2AnnounceHeader).Return(errTest) - - // return &chainSync{ - // stopCh: make(chan struct{}), - // pendingBlocks: pendingBlocks, - // peerViewSet: newPeerViewSet(0), - // workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), - // } - // }, - // peerID: somePeer, - // blockAnnounceHeader: block2AnnounceHeader, - // errWrapped: errTest, - // errMessage: "while adding pending block header: test error", - // }, - // "announced_block_while_in_bootstrap_mode": { - // chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { - // pendingBlocks := NewMockDisjointBlockSet(ctrl) - // pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(false) - // pendingBlocks.EXPECT().addHeader(block2AnnounceHeader).Return(nil) - - // state := atomic.Value{} - // state.Store(bootstrap) - - // return &chainSync{ - // stopCh: make(chan struct{}), - // pendingBlocks: pendingBlocks, - // syncMode: state, - // peerViewSet: newPeerViewSet(0), - // workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), - // } - // }, - // peerID: somePeer, - // blockAnnounceHeader: block2AnnounceHeader, - // }, + "announced_block_already_exists_in_disjoint_set": { + chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { + pendingBlocks := NewMockDisjointBlockSet(ctrl) + pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(true) + return &chainSync{ + stopCh: make(chan struct{}), + pendingBlocks: pendingBlocks, + peerViewSet: newPeerViewSet(0), + workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), + } + }, + peerID: somePeer, + blockAnnounceHeader: block2AnnounceHeader, + errWrapped: errAlreadyInDisjointSet, + errMessage: fmt.Sprintf("already in disjoint set: block %s (#%d)", + block2AnnounceHeader.Hash(), block2AnnounceHeader.Number), + }, + "failed_to_add_announced_block_in_disjoint_set": { + chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { + pendingBlocks := NewMockDisjointBlockSet(ctrl) + pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(false) + pendingBlocks.EXPECT().addHeader(block2AnnounceHeader).Return(errTest) + + return &chainSync{ + stopCh: make(chan struct{}), + pendingBlocks: pendingBlocks, + peerViewSet: newPeerViewSet(0), + workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), + } + }, + peerID: somePeer, + blockAnnounceHeader: block2AnnounceHeader, + errWrapped: errTest, + errMessage: "while adding pending block header: test error", + }, + "announced_block_while_in_bootstrap_mode": { + chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { + pendingBlocks := NewMockDisjointBlockSet(ctrl) + pendingBlocks.EXPECT().hasBlock(block2AnnounceHeader.Hash()).Return(false) + pendingBlocks.EXPECT().addHeader(block2AnnounceHeader).Return(nil) + + state := atomic.Value{} + state.Store(bootstrap) + + return &chainSync{ + stopCh: make(chan struct{}), + pendingBlocks: pendingBlocks, + syncMode: state, + peerViewSet: newPeerViewSet(0), + workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), + } + }, + peerID: somePeer, + blockAnnounceHeader: block2AnnounceHeader, + }, "announced_block_while_in_tip_mode": { chainSyncBuilder: func(ctrl *gomock.Controller) *chainSync { pendingBlocksMock := NewMockDisjointBlockSet(ctrl) From 4d596c3f6d5edd9f90e159f1e1bd7cbddc0e2f15 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Wed, 17 Jan 2024 10:48:18 -0400 Subject: [PATCH 10/10] chore: address reviews --- dot/network/service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/network/service.go b/dot/network/service.go index 151c0c5ebb..6fc3445093 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -746,7 +746,7 @@ func (s *Service) startProcessingMsg() { func (s *Service) BlockAnnounceHandshake(header *types.Header) error { peers := s.host.peers() - if len(peers) < 1 { + if len(peers) == 0 { return ErrNoPeersConnected } @@ -769,7 +769,7 @@ func (s *Service) BlockAnnounceHandshake(header *types.Header) error { defer wg.Done() stream, err := s.sendHandshake(p, handshake, protocol) if err != nil { - logger.Tracef("while sending block announce handshake: %v", err) + logger.Tracef("sending block announce handshake: %s", err) return }