From 15d951ae0b34f847115ac345baf981fc05ba1eea Mon Sep 17 00:00:00 2001 From: "Quentin McGaw (desktop)" Date: Thu, 10 Feb 2022 14:30:04 +0000 Subject: [PATCH] fix(dot/state): inject mutex protected tries to states --- dot/state/block.go | 14 ++++++-------- dot/state/block_finalisation.go | 6 ++++-- dot/state/block_race_test.go | 2 +- dot/state/block_test.go | 2 +- dot/state/initialize.go | 6 ++++-- dot/state/offline_pruner.go | 6 ++++-- dot/state/service.go | 9 ++++----- dot/state/storage.go | 24 +----------------------- dot/state/storage_test.go | 10 +++++++--- lib/grandpa/grandpa_test.go | 2 +- 10 files changed, 33 insertions(+), 48 deletions(-) diff --git a/dot/state/block.go b/dot/state/block.go index e31d8f40c9..15edecac87 100644 --- a/dot/state/block.go +++ b/dot/state/block.go @@ -27,8 +27,7 @@ import ( ) const ( - pruneKeyBufferSize = 1000 - blockPrefix = "block" + blockPrefix = "block" ) var ( @@ -60,6 +59,7 @@ type BlockState struct { genesisHash common.Hash lastFinalised common.Hash unfinalisedBlocks *sync.Map // map[common.Hash]*types.Block + tries *tries // block notifiers imported map[chan *types.Block]struct{} @@ -69,21 +69,19 @@ type BlockState struct { runtimeUpdateSubscriptionsLock sync.RWMutex runtimeUpdateSubscriptions map[uint32]chan<- runtime.Version - pruneKeyCh chan *types.Header - telemetry telemetry.Client } // NewBlockState will create a new BlockState backed by the database located at basePath -func NewBlockState(db chaindb.Database, telemetry telemetry.Client) (*BlockState, error) { +func NewBlockState(db chaindb.Database, tries *tries, telemetry telemetry.Client) (*BlockState, error) { bs := &BlockState{ dbPath: db.Path(), baseState: NewBaseState(db), db: chaindb.NewTable(db, blockPrefix), unfinalisedBlocks: new(sync.Map), + tries: tries, imported: make(map[chan *types.Block]struct{}), finalised: make(map[chan *types.FinalisationInfo]struct{}), - pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize), runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version), telemetry: telemetry, } @@ -107,16 +105,16 @@ func NewBlockState(db chaindb.Database, telemetry telemetry.Client) (*BlockState // NewBlockStateFromGenesis initialises a BlockState from a genesis header, // saving it to the database located at basePath -func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header, +func NewBlockStateFromGenesis(db chaindb.Database, tries *tries, header *types.Header, telemetryMailer telemetry.Client) (*BlockState, error) { bs := &BlockState{ bt: blocktree.NewBlockTreeFromRoot(header), baseState: NewBaseState(db), db: chaindb.NewTable(db, blockPrefix), unfinalisedBlocks: new(sync.Map), + tries: tries, imported: make(map[chan *types.Block]struct{}), finalised: make(map[chan *types.FinalisationInfo]struct{}), - pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize), runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version), genesisHash: header.Hash(), lastFinalised: header.Hash(), diff --git a/dot/state/block_finalisation.go b/dot/state/block_finalisation.go index ac19f469bc..945863eaed 100644 --- a/dot/state/block_finalisation.go +++ b/dot/state/block_finalisation.go @@ -151,8 +151,9 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er continue } + bs.tries.delete(hash) + logger.Tracef("pruned block number %s with hash %s", block.Header.Number, hash) - bs.pruneKeyCh <- &block.Header } // if nothing was previously finalised, set the first slot of the network to the @@ -238,8 +239,9 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error { continue } + bs.tries.delete(hash) + logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash) - bs.pruneKeyCh <- &block.Header } return batch.Flush() diff --git a/dot/state/block_race_test.go b/dot/state/block_race_test.go index 1f1284556c..373c961be1 100644 --- a/dot/state/block_race_test.go +++ b/dot/state/block_race_test.go @@ -34,7 +34,7 @@ func TestConcurrencySetHeader(t *testing.T) { go func(index int) { defer pend.Done() - bs, err := NewBlockStateFromGenesis(dbs[index], testGenesisHeader, telemetryMock) + bs, err := NewBlockStateFromGenesis(dbs[index], nil, testGenesisHeader, telemetryMock) require.NoError(t, err) header := &types.Header{ diff --git a/dot/state/block_test.go b/dot/state/block_test.go index 50e6513f5f..cdd521186f 100644 --- a/dot/state/block_test.go +++ b/dot/state/block_test.go @@ -35,7 +35,7 @@ func newTestBlockState(t *testing.T, header *types.Header) *BlockState { header = testGenesisHeader } - bs, err := NewBlockStateFromGenesis(db, header, telemetryMock) + bs, err := NewBlockStateFromGenesis(db, nil, header, telemetryMock) require.NoError(t, err) return bs } diff --git a/dot/state/initialize.go b/dot/state/initialize.go index 782ebe89e7..a0cfb63498 100644 --- a/dot/state/initialize.go +++ b/dot/state/initialize.go @@ -62,14 +62,16 @@ func (s *Service) Initialise(gen *genesis.Genesis, header *types.Header, t *trie return fmt.Errorf("failed to write genesis values to database: %s", err) } + tries := newTries(t) + // create block state from genesis block - blockState, err := NewBlockStateFromGenesis(db, header, s.Telemetry) + blockState, err := NewBlockStateFromGenesis(db, tries, header, s.Telemetry) if err != nil { return fmt.Errorf("failed to create block state from genesis: %s", err) } // create storage state from genesis trie - storageState, err := NewStorageState(db, blockState, t, pruner.Config{}) + storageState, err := NewStorageState(db, blockState, tries, pruner.Config{}) if err != nil { return fmt.Errorf("failed to create storage state from trie: %s", err) } diff --git a/dot/state/offline_pruner.go b/dot/state/offline_pruner.go index eedc26d7ec..cc0b0bf49a 100644 --- a/dot/state/offline_pruner.go +++ b/dot/state/offline_pruner.go @@ -41,9 +41,11 @@ func NewOfflinePruner(inputDBPath, prunedDBPath string, bloomSize uint64, return nil, fmt.Errorf("failed to load DB %w", err) } + tries := newTries(trie.NewEmptyTrie()) + // create blockState state // NewBlockState on pruner execution does not use telemetry - blockState, err := NewBlockState(db, nil) + blockState, err := NewBlockState(db, tries, nil) if err != nil { return nil, fmt.Errorf("failed to create block state: %w", err) } @@ -60,7 +62,7 @@ func NewOfflinePruner(inputDBPath, prunedDBPath string, bloomSize uint64, } // load storage state - storageState, err := NewStorageState(db, blockState, trie.NewEmptyTrie(), pruner.Config{}) + storageState, err := NewStorageState(db, blockState, tries, pruner.Config{}) if err != nil { return nil, fmt.Errorf("failed to create new storage state %w", err) } diff --git a/dot/state/service.go b/dot/state/service.go index 48c03f99b7..dc65673425 100644 --- a/dot/state/service.go +++ b/dot/state/service.go @@ -114,9 +114,11 @@ func (s *Service) Start() error { return nil } + tries := newTries(trie.NewEmptyTrie()) + var err error // create block state - s.Block, err = NewBlockState(s.db, s.Telemetry) + s.Block, err = NewBlockState(s.db, tries, s.Telemetry) if err != nil { return fmt.Errorf("failed to create block state: %w", err) } @@ -136,7 +138,7 @@ func (s *Service) Start() error { } // create storage state - s.Storage, err = NewStorageState(s.db, s.Block, trie.NewEmptyTrie(), pr) + s.Storage, err = NewStorageState(s.db, s.Block, tries, pr) if err != nil { return fmt.Errorf("failed to create storage state: %w", err) } @@ -167,9 +169,6 @@ func (s *Service) Start() error { ", highest number " + num.String() + " and genesis hash " + s.Block.genesisHash.String()) - // Start background goroutine to GC pruned keys. - go s.Storage.pruneStorage(s.closeCh) - return nil } diff --git a/dot/state/storage.go b/dot/state/storage.go index 94772187df..b4ac708245 100644 --- a/dot/state/storage.go +++ b/dot/state/storage.go @@ -43,17 +43,11 @@ type StorageState struct { // NewStorageState creates a new StorageState backed by the given trie and database located at basePath. func NewStorageState(db chaindb.Database, blockState *BlockState, - t *trie.Trie, onlinePruner pruner.Config) (*StorageState, error) { + tries *tries, onlinePruner pruner.Config) (*StorageState, error) { if db == nil { return nil, fmt.Errorf("cannot have nil database") } - if t == nil { - return nil, fmt.Errorf("cannot have nil trie") - } - - tries := newTries(t) - storageTable := chaindb.NewTable(db, storagePrefix) var p pruner.Pruner @@ -76,11 +70,6 @@ func NewStorageState(db chaindb.Database, blockState *BlockState, }, nil } -func (s *StorageState) pruneKey(keyHeader *types.Header) { - logger.Tracef("pruning trie, number=%d hash=%s", keyHeader.Number, keyHeader.Hash()) - s.tries.delete(keyHeader.StateRoot) -} - // StoreTrie stores the given trie in the StorageState and writes it to the database func (s *StorageState) StoreTrie(ts *rtstorage.TrieState, header *types.Header) error { root := ts.MustRoot() @@ -314,14 +303,3 @@ func (s *StorageState) LoadCodeHash(hash *common.Hash) (common.Hash, error) { func (s *StorageState) GenerateTrieProof(stateRoot common.Hash, keys [][]byte) ([][]byte, error) { return trie.GenerateProof(stateRoot[:], keys, s.db) } - -func (s *StorageState) pruneStorage(closeCh chan interface{}) { - for { - select { - case key := <-s.blockState.pruneKeyCh: - s.pruneKey(key) - case <-closeCh: - return - } - } -} diff --git a/dot/state/storage_test.go b/dot/state/storage_test.go index b455a66231..8486c2c56e 100644 --- a/dot/state/storage_test.go +++ b/dot/state/storage_test.go @@ -25,7 +25,9 @@ func newTestStorageState(t *testing.T) *StorageState { db := NewInMemoryDB(t) bs := newTestBlockState(t, testGenesisHeader) - s, err := NewStorageState(db, bs, trie.NewEmptyTrie(), pruner.Config{}) + tries := newTries(trie.NewEmptyTrie()) + + s, err := NewStorageState(db, bs, tries, pruner.Config{}) require.NoError(t, err) return s } @@ -179,7 +181,7 @@ func TestGetStorageChildAndGetStorageFromChild(t *testing.T) { "0", )) - blockState, err := NewBlockStateFromGenesis(db, genHeader, telemetryMock) + blockState, err := NewBlockStateFromGenesis(db, nil, genHeader, telemetryMock) require.NoError(t, err) testChildTrie := trie.NewEmptyTrie() @@ -188,7 +190,9 @@ func TestGetStorageChildAndGetStorageFromChild(t *testing.T) { err = genTrie.PutChild([]byte("keyToChild"), testChildTrie) require.NoError(t, err) - storage, err := NewStorageState(db, blockState, genTrie, pruner.Config{}) + tries := newTries(genTrie) + + storage, err := NewStorageState(db, blockState, tries, pruner.Config{}) require.NoError(t, err) trieState, err := runtime.NewTrieState(genTrie) diff --git a/lib/grandpa/grandpa_test.go b/lib/grandpa/grandpa_test.go index f52c86f7a1..b09e803ad2 100644 --- a/lib/grandpa/grandpa_test.go +++ b/lib/grandpa/grandpa_test.go @@ -60,7 +60,7 @@ func newTestState(t *testing.T) *state.Service { t.Cleanup(func() { db.Close() }) _, genTrie, _ := genesis.NewTestGenesisWithTrieAndHeader(t) - block, err := state.NewBlockStateFromGenesis(db, testGenesisHeader, telemetryMock) + block, err := state.NewBlockStateFromGenesis(db, nil, testGenesisHeader, telemetryMock) require.NoError(t, err) rtCfg := &wasmer.Config{}