Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/state,dot/network): improve memory usage when syncing #1491

Merged
merged 13 commits into from
Mar 25, 2021
2 changes: 2 additions & 0 deletions dot/core/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ func (s *mockSyncer) IsSynced() bool {
return false
}

func (s *mockSyncer) SetSyncing(bool) {}

type mockDigestItem struct { //nolint
i int
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestService_Health(t *testing.T) {
require.Equal(t, s.Health().IsSyncing, true)
mockSync := s.syncer.(*mockSyncer)

mockSync.setSyncedState(true)
mockSync.SetSyncing(false)
require.Equal(t, s.Health().IsSyncing, false)
}

Expand Down
2 changes: 2 additions & 0 deletions dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type Syncer interface {

// IsSynced exposes the internal synced state // TODO: use syncQueue for this
IsSynced() bool

SetSyncing(bool)
}

// TransactionHandler is the interface used by the transactions sub-protocol
Expand Down
17 changes: 12 additions & 5 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ func (q *syncQueue) syncAtHead() {
return
}

q.s.syncer.SetSyncing(true)

for {
select {
// sleep for average block time TODO: make this configurable from slot duration
Expand All @@ -183,6 +185,9 @@ func (q *syncQueue) syncAtHead() {
continue
}

logger.Info("set syncing to false")
q.s.syncer.SetSyncing(false)

// we have received new blocks since the last check, sleep
if prev.Number.Int64() < curr.Number.Int64() {
prev = curr
Expand All @@ -192,10 +197,7 @@ func (q *syncQueue) syncAtHead() {
prev = curr
start := uint64(curr.Number.Int64()) + 1
logger.Debug("haven't received new blocks since last check, pushing request", "start", start)
q.requestData.Store(start, requestData{
sent: true,
received: false,
})
q.requestData.Delete(start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what's happening here. It seems we're deleting start from the requestData map, but the pushing it back on the map? It seems to be working, but I don't fully understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the requestData tracks requests that were sent and received, in the case we are at the head it seems like often we will send out a request and receive an empty response if the node doesn't have info for that block yet. the request processor ignores requests that have already gotten a response, so we need to delete the info for it that says we got a request first. I noticed the node would go into a loop of trying to send a request but thinking it already got the response and thus never syncing

q.pushRequest(start, 1, "")
}
}
Expand Down Expand Up @@ -248,7 +250,12 @@ func (q *syncQueue) handleResponseQueue() {
// prune peers with low score and connect to new peers
func (q *syncQueue) prunePeers() {
for {
time.Sleep(time.Second * 30)
select {
case <-time.After(time.Second * 30):
case <-q.ctx.Done():
return
}

logger.Debug("✂️ pruning peers w/ low score...")

peers := q.getSortedPeers()
Expand Down
4 changes: 2 additions & 2 deletions dot/network/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (s *mockSyncer) IsSynced() bool {
return s.synced
}

func (s *mockSyncer) setSyncedState(newState bool) {
s.synced = newState
func (s *mockSyncer) SetSyncing(syncing bool) {
s.synced = !syncing
}

type testStreamHandler struct {
Expand Down
8 changes: 8 additions & 0 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"path"
"runtime/debug"
"sync"
"syscall"

Expand Down Expand Up @@ -158,6 +159,13 @@ func NodeInitialized(basepath string, expected bool) bool {

// NewNode creates a new dot node from a dot node configuration
func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, error) {
// set garbage collection percent to 10%
// can be overwritten by setting the GOGC env veriable, which defaults to 100
prev := debug.SetGCPercent(10)
if prev != 100 {
debug.SetGCPercent(prev)
}

setupLogger(cfg)

// if authority node, should have at least 1 key in keystore
Expand Down
2 changes: 2 additions & 0 deletions dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (s *mockSyncer) IsSynced() bool {
return false
}

func (s *mockSyncer) SetSyncing(_ bool) {}

type mockBlockState struct{}

func (s *mockBlockState) BestBlockHeader() (*types.Header, error) {
Expand Down
14 changes: 14 additions & 0 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type StorageState struct {
// change notifiers
changedLock sync.RWMutex
subscriptions map[byte]*StorageSubscription

syncing bool
}

// NewStorageState creates a new StorageState backed by the given trie and database located at basePath.
Expand All @@ -76,6 +78,11 @@ func NewStorageState(db chaindb.Database, blockState *BlockState, t *trie.Trie)
}, nil
}

// SetSyncing sets whether the node is currently syncing or not
func (s *StorageState) SetSyncing(syncing bool) {
s.syncing = syncing
}

func (s *StorageState) pruneKey(keyHeader *types.Header) {
s.lock.Lock()
defer s.lock.Unlock()
Expand All @@ -93,6 +100,12 @@ func (s *StorageState) pruneKey(keyHeader *types.Header) {
func (s *StorageState) StoreTrie(ts *rtstorage.TrieState) error {
s.lock.Lock()
root := ts.MustRoot()
if s.syncing {
// keep only the trie at the head of the chain when syncing
for key := range s.tries {
delete(s.tries, key)
}
}
s.tries[root] = ts.Trie()
s.lock.Unlock()

Expand All @@ -108,6 +121,7 @@ func (s *StorageState) StoreTrie(ts *rtstorage.TrieState) error {
logger.Warn("failed to notify storage subscriptions", "error", err)
}
}()

return nil
}

Expand Down
30 changes: 30 additions & 0 deletions dot/state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,33 @@ func TestStorage_LoadFromDB(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 3, len(entries))
}

func TestStorage_StoreTrie_Syncing(t *testing.T) {
storage := newTestStorageState(t)
ts, err := storage.TrieState(&trie.EmptyHash)
require.NoError(t, err)

key := []byte("testkey")
value := []byte("testvalue")
ts.Set(key, value)

storage.SetSyncing(true)
err = storage.StoreTrie(ts)
require.NoError(t, err)
require.Equal(t, 1, len(storage.tries))
}

func TestStorage_StoreTrie_NotSyncing(t *testing.T) {
storage := newTestStorageState(t)
ts, err := storage.TrieState(&trie.EmptyHash)
require.NoError(t, err)

key := []byte("testkey")
value := []byte("testvalue")
ts.Set(key, value)

storage.SetSyncing(false)
err = storage.StoreTrie(ts)
require.NoError(t, err)
require.Equal(t, 2, len(storage.tries))
}
1 change: 1 addition & 0 deletions dot/sync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type StorageState interface {
TrieState(root *common.Hash) (*rtstorage.TrieState, error)
StoreTrie(ts *rtstorage.TrieState) error
LoadCodeHash(*common.Hash) (common.Hash, error)
SetSyncing(bool)
}

// TransactionState is the interface for transaction queue methods
Expand Down
6 changes: 6 additions & 0 deletions dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,3 +412,9 @@ func (s *Service) handleDigests(header *types.Header) {
func (s *Service) IsSynced() bool {
return s.synced
}

// SetSyncing sets whether the node is currently syncing or not
func (s *Service) SetSyncing(syncing bool) {
s.synced = !syncing
s.storageState.SetSyncing(syncing)
}