diff --git a/dot/core/test_helpers.go b/dot/core/test_helpers.go index 3da5758d64..e5657c6a28 100644 --- a/dot/core/test_helpers.go +++ b/dot/core/test_helpers.go @@ -261,6 +261,8 @@ func (s *mockSyncer) IsSynced() bool { return false } +func (s *mockSyncer) SetSyncing(bool) {} + type mockDigestItem struct { //nolint i int } diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 54f4c7b413..dab4ccb26c 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -187,7 +187,7 @@ func TestService_Health(t *testing.T) { require.Equal(t, s.Health().IsSyncing, true) mockSync := s.syncer.(*mockSyncer) - mockSync.setSyncedState(true) + mockSync.SetSyncing(false) require.Equal(t, s.Health().IsSyncing, false) } diff --git a/dot/network/state.go b/dot/network/state.go index 2077ada3b3..77725bd7de 100644 --- a/dot/network/state.go +++ b/dot/network/state.go @@ -46,6 +46,8 @@ type Syncer interface { // IsSynced exposes the internal synced state // TODO: use syncQueue for this IsSynced() bool + + SetSyncing(bool) } // TransactionHandler is the interface used by the transactions sub-protocol diff --git a/dot/network/sync.go b/dot/network/sync.go index af0b02c3ab..a456e22df8 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -164,6 +164,8 @@ func (q *syncQueue) syncAtHead() { return } + q.s.syncer.SetSyncing(true) + for { select { // sleep for average block time TODO: make this configurable from slot duration @@ -183,6 +185,9 @@ func (q *syncQueue) syncAtHead() { continue } + logger.Info("set syncing to false") + q.s.syncer.SetSyncing(false) + // we have received new blocks since the last check, sleep if prev.Number.Int64() < curr.Number.Int64() { prev = curr @@ -192,10 +197,7 @@ func (q *syncQueue) syncAtHead() { prev = curr start := uint64(curr.Number.Int64()) + 1 logger.Debug("haven't received new blocks since last check, pushing request", "start", start) - q.requestData.Store(start, requestData{ - sent: true, - received: false, - }) + q.requestData.Delete(start) q.pushRequest(start, 1, "") } } @@ -248,7 +250,12 @@ func (q *syncQueue) handleResponseQueue() { // prune peers with low score and connect to new peers func (q *syncQueue) prunePeers() { for { - time.Sleep(time.Second * 30) + select { + case <-time.After(time.Second * 30): + case <-q.ctx.Done(): + return + } + logger.Debug("✂️ pruning peers w/ low score...") peers := q.getSortedPeers() diff --git a/dot/network/test_helpers.go b/dot/network/test_helpers.go index 17df3c7a10..03851ac325 100644 --- a/dot/network/test_helpers.go +++ b/dot/network/test_helpers.go @@ -65,8 +65,8 @@ func (s *mockSyncer) IsSynced() bool { return s.synced } -func (s *mockSyncer) setSyncedState(newState bool) { - s.synced = newState +func (s *mockSyncer) SetSyncing(syncing bool) { + s.synced = !syncing } type testStreamHandler struct { diff --git a/dot/node.go b/dot/node.go index 53ef7103c4..34d0abcdcb 100644 --- a/dot/node.go +++ b/dot/node.go @@ -22,6 +22,7 @@ import ( "os" "os/signal" "path" + "runtime/debug" "sync" "syscall" @@ -158,6 +159,13 @@ func NodeInitialized(basepath string, expected bool) bool { // NewNode creates a new dot node from a dot node configuration func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, error) { + // set garbage collection percent to 10% + // can be overwritten by setting the GOGC env veriable, which defaults to 100 + prev := debug.SetGCPercent(10) + if prev != 100 { + debug.SetGCPercent(prev) + } + setupLogger(cfg) // if authority node, should have at least 1 key in keystore diff --git a/dot/rpc/modules/system_test.go b/dot/rpc/modules/system_test.go index a9d6c207af..08e3512faf 100644 --- a/dot/rpc/modules/system_test.go +++ b/dot/rpc/modules/system_test.go @@ -63,6 +63,8 @@ func (s *mockSyncer) IsSynced() bool { return false } +func (s *mockSyncer) SetSyncing(_ bool) {} + type mockBlockState struct{} func (s *mockBlockState) BestBlockHeader() (*types.Header, error) { diff --git a/dot/state/storage.go b/dot/state/storage.go index 30659447c1..73bea66666 100644 --- a/dot/state/storage.go +++ b/dot/state/storage.go @@ -52,6 +52,8 @@ type StorageState struct { // change notifiers changedLock sync.RWMutex subscriptions map[byte]*StorageSubscription + + syncing bool } // NewStorageState creates a new StorageState backed by the given trie and database located at basePath. @@ -76,6 +78,11 @@ func NewStorageState(db chaindb.Database, blockState *BlockState, t *trie.Trie) }, nil } +// SetSyncing sets whether the node is currently syncing or not +func (s *StorageState) SetSyncing(syncing bool) { + s.syncing = syncing +} + func (s *StorageState) pruneKey(keyHeader *types.Header) { s.lock.Lock() defer s.lock.Unlock() @@ -93,6 +100,12 @@ func (s *StorageState) pruneKey(keyHeader *types.Header) { func (s *StorageState) StoreTrie(ts *rtstorage.TrieState) error { s.lock.Lock() root := ts.MustRoot() + if s.syncing { + // keep only the trie at the head of the chain when syncing + for key := range s.tries { + delete(s.tries, key) + } + } s.tries[root] = ts.Trie() s.lock.Unlock() @@ -108,6 +121,7 @@ func (s *StorageState) StoreTrie(ts *rtstorage.TrieState) error { logger.Warn("failed to notify storage subscriptions", "error", err) } }() + return nil } diff --git a/dot/state/storage_test.go b/dot/state/storage_test.go index 5be980bea2..f6601eebdf 100644 --- a/dot/state/storage_test.go +++ b/dot/state/storage_test.go @@ -143,3 +143,33 @@ func TestStorage_LoadFromDB(t *testing.T) { require.NoError(t, err) require.Equal(t, 3, len(entries)) } + +func TestStorage_StoreTrie_Syncing(t *testing.T) { + storage := newTestStorageState(t) + ts, err := storage.TrieState(&trie.EmptyHash) + require.NoError(t, err) + + key := []byte("testkey") + value := []byte("testvalue") + ts.Set(key, value) + + storage.SetSyncing(true) + err = storage.StoreTrie(ts) + require.NoError(t, err) + require.Equal(t, 1, len(storage.tries)) +} + +func TestStorage_StoreTrie_NotSyncing(t *testing.T) { + storage := newTestStorageState(t) + ts, err := storage.TrieState(&trie.EmptyHash) + require.NoError(t, err) + + key := []byte("testkey") + value := []byte("testvalue") + ts.Set(key, value) + + storage.SetSyncing(false) + err = storage.StoreTrie(ts) + require.NoError(t, err) + require.Equal(t, 2, len(storage.tries)) +} diff --git a/dot/sync/interface.go b/dot/sync/interface.go index 2500d0df5d..1a457091e2 100644 --- a/dot/sync/interface.go +++ b/dot/sync/interface.go @@ -52,6 +52,7 @@ type StorageState interface { TrieState(root *common.Hash) (*rtstorage.TrieState, error) StoreTrie(ts *rtstorage.TrieState) error LoadCodeHash(*common.Hash) (common.Hash, error) + SetSyncing(bool) } // TransactionState is the interface for transaction queue methods diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index ce0df185a3..c5a96dd23c 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -411,3 +411,9 @@ func (s *Service) handleDigests(header *types.Header) { func (s *Service) IsSynced() bool { return s.synced } + +// SetSyncing sets whether the node is currently syncing or not +func (s *Service) SetSyncing(syncing bool) { + s.synced = !syncing + s.storageState.SetSyncing(syncing) +} diff --git a/tests/rpc/rpc_01-system_test.go b/tests/rpc/rpc_01-system_test.go index 48c4d2ab1d..b64641b1a0 100644 --- a/tests/rpc/rpc_01-system_test.go +++ b/tests/rpc/rpc_01-system_test.go @@ -60,7 +60,7 @@ func TestSystemRPC(t *testing.T) { expected: modules.SystemHealthResponse{ Peers: 2, - IsSyncing: false, + IsSyncing: true, ShouldHavePeers: true, }, params: "{}",