From 5f3643f8f4bd4921349b9743f7492a70bfad9e23 Mon Sep 17 00:00:00 2001 From: noot <36753753+noot@users.noreply.github.com> Date: Wed, 13 Oct 2021 15:27:25 +0200 Subject: [PATCH] feature(dot/sync): implement `*tipSyncer.hasCurrentWorker`, add minPeers, slotDuration to `chainSync`, check if syncing in `*core.Service.HandleTransactionMessage` (#1881) --- dot/core/interface.go | 1 + dot/core/messages.go | 5 ++ dot/core/mocks/{network.go => Network.go} | 16 +++++- dot/core/test_helpers.go | 36 ++----------- dot/network/service.go | 5 ++ dot/services.go | 7 +++ dot/sync/chain_sync.go | 15 +++--- dot/sync/chain_sync_test.go | 8 ++- dot/sync/syncer.go | 5 +- dot/sync/tip_syncer.go | 45 +++++++++++++++- dot/sync/tip_syncer_test.go | 65 +++++++++++++++++++++++ 11 files changed, 164 insertions(+), 44 deletions(-) rename dot/core/mocks/{network.go => Network.go} (56%) diff --git a/dot/core/interface.go b/dot/core/interface.go index 8c5710b097..f410eb5450 100644 --- a/dot/core/interface.go +++ b/dot/core/interface.go @@ -79,6 +79,7 @@ type TransactionState interface { // Network is the interface for the network service type Network interface { GossipMessage(network.NotificationsMessage) + IsSynced() bool } // EpochState is the interface for state.EpochState diff --git a/dot/core/messages.go b/dot/core/messages.go index 8cf9f2ca5d..1c7572e188 100644 --- a/dot/core/messages.go +++ b/dot/core/messages.go @@ -28,6 +28,11 @@ import ( func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (bool, error) { logger.Debug("received TransactionMessage") + if !s.net.IsSynced() { + logger.Debug("ignoring TransactionMessage, not yet synced") + return false, nil + } + // get transactions from message extrinsics txs := msg.Extrinsics var toPropagate []types.Extrinsic diff --git a/dot/core/mocks/network.go b/dot/core/mocks/Network.go similarity index 56% rename from dot/core/mocks/network.go rename to dot/core/mocks/Network.go index 314c19b80a..9bc4564527 100644 --- a/dot/core/mocks/network.go +++ b/dot/core/mocks/Network.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.8.0. DO NOT EDIT. +// Code generated by mockery v2.9.4. DO NOT EDIT. package mocks @@ -16,3 +16,17 @@ type MockNetwork struct { func (_m *MockNetwork) GossipMessage(_a0 network.NotificationsMessage) { _m.Called(_a0) } + +// IsSynced provides a mock function with given fields: +func (_m *MockNetwork) IsSynced() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} diff --git a/dot/core/test_helpers.go b/dot/core/test_helpers.go index 9c4c1426fc..a0d2ff0f6f 100644 --- a/dot/core/test_helpers.go +++ b/dot/core/test_helpers.go @@ -124,15 +124,10 @@ func NewTestService(t *testing.T, cfg *Config) *Service { cfg.BlockState.StoreRuntime(cfg.BlockState.BestBlockHash(), cfg.Runtime) if cfg.Network == nil { - config := &network.Config{ - BasePath: testDatadirPath, - Port: 7001, - NoBootstrap: true, - NoMDNS: true, - BlockState: stateSrvc.Block, - TransactionHandler: network.NewMockTransactionHandler(), - } - cfg.Network = createTestNetworkService(t, config) + net := new(coremocks.MockNetwork) + net.On("GossipMessage", mock.AnythingOfType("*network.TransactionMessage")) + net.On("IsSynced").Return(true) + cfg.Network = net } if cfg.CodeSubstitutes == nil { @@ -160,26 +155,3 @@ func NewTestService(t *testing.T, cfg *Config) *Service { return s } - -// helper method to create and start a new network service -func createTestNetworkService(t *testing.T, cfg *network.Config) (srvc *network.Service) { - if cfg.LogLvl == 0 { - cfg.LogLvl = 3 - } - - if cfg.Syncer == nil { - cfg.Syncer = network.NewMockSyncer() - } - - srvc, err := network.NewService(cfg) - require.NoError(t, err) - - err = srvc.Start() - require.NoError(t, err) - - t.Cleanup(func() { - err := srvc.Stop() - require.NoError(t, err) - }) - return srvc -} diff --git a/dot/network/service.go b/dot/network/service.go index c3ca3e9564..42a30af9f5 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -738,3 +738,8 @@ func (*Service) StartingBlock() int64 { // TODO: refactor this to get the data from the sync service return 0 } + +// IsSynced returns whether we are synced (no longer in bootstrap mode) or not +func (s *Service) IsSynced() bool { + return s.syncer.IsSynced() +} diff --git a/dot/services.go b/dot/services.go index 0895c8ec10..62b41bf2a6 100644 --- a/dot/services.go +++ b/dot/services.go @@ -407,6 +407,11 @@ func createBlockVerifier(st *state.Service) (*babe.VerificationManager, error) { } func newSyncService(cfg *Config, st *state.Service, fg sync.FinalityGadget, verifier *babe.VerificationManager, cs *core.Service, net *network.Service) (*sync.Service, error) { + slotDuration, err := st.Epoch.GetSlotDuration() + if err != nil { + return nil, err + } + syncCfg := &sync.Config{ LogLvl: cfg.Log.SyncLvl, Network: net, @@ -416,6 +421,8 @@ func newSyncService(cfg *Config, st *state.Service, fg sync.FinalityGadget, veri FinalityGadget: fg, BabeVerifier: verifier, BlockImportHandler: cs, + MinPeers: cfg.Network.MinPeers, + SlotDuration: slotDuration, } return sync.NewService(syncCfg) diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index f54921e0cf..617627ab0b 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -155,9 +155,12 @@ type chainSync struct { benchmarker *syncBenchmarker finalisedCh <-chan *types.FinalisationInfo + + minPeers int + slotDuration time.Duration } -func newChainSync(bs BlockState, net Network, readyBlocks *blockQueue, pendingBlocks DisjointBlockSet) *chainSync { +func newChainSync(bs BlockState, net Network, readyBlocks *blockQueue, pendingBlocks DisjointBlockSet, minPeers int, slotDuration time.Duration) *chainSync { ctx, cancel := context.WithCancel(context.Background()) return &chainSync{ ctx: ctx, @@ -175,17 +178,18 @@ func newChainSync(bs BlockState, net Network, readyBlocks *blockQueue, pendingBl handler: newBootstrapSyncer(bs), benchmarker: newSyncBenchmarker(), finalisedCh: bs.GetFinalisedNotifierChannel(), + minPeers: minPeers, + slotDuration: slotDuration, } } func (cs *chainSync) start() { - // wait until we have received 1+ peer heads - // TODO: this should be based off our min/max peers + // wait until we have received at least `minPeers` peer heads for { cs.RLock() n := len(cs.peerState) cs.RUnlock() - if n >= 1 { + if n >= cs.minPeers { break } time.Sleep(time.Millisecond * 100) @@ -369,8 +373,7 @@ func (cs *chainSync) ignorePeer(who peer.ID) { func (cs *chainSync) sync() { // set to slot time - // TODO: make configurable - ticker := time.NewTicker(time.Second * 6) + ticker := time.NewTicker(cs.slotDuration) for { select { diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index 0f84f531c9..ef215e69b6 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -36,7 +36,11 @@ import ( "github.com/stretchr/testify/require" ) -var testTimeout = time.Second * 5 +const ( + defaultMinPeers = 1 + testTimeout = time.Second * 5 + defaultSlotDuration = time.Second * 6 +) func newTestChainSync(t *testing.T) (*chainSync, *blockQueue) { header, err := types.NewHeader(common.NewHash([]byte{0}), trie.EmptyHash, trie.EmptyHash, big.NewInt(0), types.NewDigest()) @@ -51,7 +55,7 @@ func newTestChainSync(t *testing.T) (*chainSync, *blockQueue) { net.On("DoBlockRequest", mock.AnythingOfType("peer.ID"), mock.AnythingOfType("*network.BlockRequestMessage")).Return(nil, nil) readyBlocks := newBlockQueue(maxResponseSize) - cs := newChainSync(bs, net, readyBlocks, newDisjointBlockSet(pendingBlocksLimit)) + cs := newChainSync(bs, net, readyBlocks, newDisjointBlockSet(pendingBlocksLimit), defaultMinPeers, defaultSlotDuration) return cs, readyBlocks } diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index c0da9d5d70..9c4330f45b 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -19,6 +19,7 @@ package sync import ( "math/big" "os" + "time" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/types" @@ -46,6 +47,8 @@ type Config struct { TransactionState TransactionState BlockImportHandler BlockImportHandler BabeVerifier BabeVerifier + MinPeers int + SlotDuration time.Duration } // NewService returns a new *sync.Service @@ -84,7 +87,7 @@ func NewService(cfg *Config) (*Service, error) { readyBlocks := newBlockQueue(maxResponseSize * 30) pendingBlocks := newDisjointBlockSet(pendingBlocksLimit) - chainSync := newChainSync(cfg.BlockState, cfg.Network, readyBlocks, pendingBlocks) + chainSync := newChainSync(cfg.BlockState, cfg.Network, readyBlocks, pendingBlocks, cfg.MinPeers, cfg.SlotDuration) chainProcessor := newChainProcessor(readyBlocks, pendingBlocks, cfg.BlockState, cfg.StorageState, cfg.TransactionState, cfg.BabeVerifier, cfg.FinalityGadget, cfg.BlockImportHandler) return &Service{ diff --git a/dot/sync/tip_syncer.go b/dot/sync/tip_syncer.go index 0cce59c7ab..79bde3c011 100644 --- a/dot/sync/tip_syncer.go +++ b/dot/sync/tip_syncer.go @@ -109,8 +109,49 @@ func (s *tipSyncer) handleWorkerResult(res *worker) (*worker, error) { }, nil } -func (*tipSyncer) hasCurrentWorker(_ *worker, _ map[uint64]*worker) bool { - // TODO +func (*tipSyncer) hasCurrentWorker(w *worker, workers map[uint64]*worker) bool { + if w == nil || w.startNumber == nil || w.targetNumber == nil { + return true + } + + for _, curr := range workers { + if w.direction != curr.direction || w.requestData != curr.requestData { + continue + } + + targetDiff := w.targetNumber.Cmp(curr.targetNumber) + startDiff := w.startNumber.Cmp(curr.startNumber) + + switch w.direction { + case network.Ascending: + // worker target is greater than existing worker's target + if targetDiff > 0 { + continue + } + + // worker start is less than existing worker's start + if startDiff < 0 { + continue + } + case network.Descending: + // worker target is less than existing worker's target + if targetDiff < 0 { + continue + } + + // worker start is greater than existing worker's start + if startDiff > 0 { + continue + } + } + + // worker (start, end) is within curr (start, end), if hashes are equal then the request is either + // for the same data or some subset of data that is covered by curr + if w.startHash.Equal(curr.startHash) || w.targetHash.Equal(curr.targetHash) { + return true + } + } + return false } diff --git a/dot/sync/tip_syncer_test.go b/dot/sync/tip_syncer_test.go index 3b1da13a88..ebef24465d 100644 --- a/dot/sync/tip_syncer_test.go +++ b/dot/sync/tip_syncer_test.go @@ -279,3 +279,68 @@ func TestTipSyncer_handleTick_case3(t *testing.T) { s.readyBlocks.pop() // first pop will remove parent require.Equal(t, block.ToBlockData(), s.readyBlocks.pop()) } + +func TestTipSyncer_hasCurrentWorker(t *testing.T) { + s := newTestTipSyncer(t) + require.False(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(0), + targetNumber: big.NewInt(0), + }, nil)) + + workers := make(map[uint64]*worker) + workers[0] = &worker{ + startNumber: big.NewInt(1), + targetNumber: big.NewInt(128), + } + require.False(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(1), + targetNumber: big.NewInt(129), + }, workers)) + require.True(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(1), + targetNumber: big.NewInt(128), + }, workers)) + require.True(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(1), + targetNumber: big.NewInt(127), + }, workers)) + + workers[0] = &worker{ + startNumber: big.NewInt(128), + targetNumber: big.NewInt(255), + } + require.False(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(127), + targetNumber: big.NewInt(255), + }, workers)) + require.True(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(128), + targetNumber: big.NewInt(255), + }, workers)) + + workers[0] = &worker{ + startNumber: big.NewInt(128), + targetNumber: big.NewInt(1), + direction: network.Descending, + } + require.False(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(129), + targetNumber: big.NewInt(1), + direction: network.Descending, + }, workers)) + require.True(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(128), + targetNumber: big.NewInt(1), + direction: network.Descending, + }, workers)) + require.True(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(128), + targetNumber: big.NewInt(2), + direction: network.Descending, + }, workers)) + require.True(t, s.hasCurrentWorker(&worker{ + startNumber: big.NewInt(127), + targetNumber: big.NewInt(1), + direction: network.Descending, + }, workers)) +}