Skip to content

Commit

Permalink
chore(dot/state): replace sync.Map with map+mutex (#2286)
Browse files Browse the repository at this point in the history
  • Loading branch information
qdm12 committed Mar 3, 2022
1 parent e1345da commit 666795b
Show file tree
Hide file tree
Showing 4 changed files with 445 additions and 64 deletions.
71 changes: 17 additions & 54 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type BlockState struct {
sync.RWMutex
genesisHash common.Hash
lastFinalised common.Hash
unfinalisedBlocks *sync.Map // map[common.Hash]*types.Block
unfinalisedBlocks *hashToBlockMap
tries *Tries

// block notifiers
Expand All @@ -78,7 +78,7 @@ func NewBlockState(db chaindb.Database, trs *Tries, telemetry telemetry.Client)
dbPath: db.Path(),
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
unfinalisedBlocks: new(sync.Map),
unfinalisedBlocks: newHashToBlockMap(),
tries: trs,
imported: make(map[chan *types.Block]struct{}),
finalised: make(map[chan *types.FinalisationInfo]struct{}),
Expand Down Expand Up @@ -106,12 +106,12 @@ func NewBlockState(db chaindb.Database, trs *Tries, telemetry telemetry.Client)
// NewBlockStateFromGenesis initialises a BlockState from a genesis header,
// saving it to the database located at basePath
func NewBlockStateFromGenesis(db chaindb.Database, trs *Tries, header *types.Header,
telemetryMailer telemetry.Client) (*BlockState, error) { // TODO CHECKTEST
telemetryMailer telemetry.Client) (*BlockState, error) {
bs := &BlockState{
bt: blocktree.NewBlockTreeFromRoot(header),
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
unfinalisedBlocks: new(sync.Map),
unfinalisedBlocks: newHashToBlockMap(),
tries: trs,
imported: make(map[chan *types.Block]struct{}),
finalised: make(map[chan *types.FinalisationInfo]struct{}),
Expand Down Expand Up @@ -184,56 +184,19 @@ func (bs *BlockState) GenesisHash() common.Hash {
return bs.genesisHash
}

func (bs *BlockState) storeUnfinalisedBlock(block *types.Block) {
bs.unfinalisedBlocks.Store(block.Header.Hash(), block)
}

func (bs *BlockState) hasUnfinalisedBlock(hash common.Hash) bool {
_, has := bs.unfinalisedBlocks.Load(hash)
return has
}

func (bs *BlockState) getUnfinalisedHeader(hash common.Hash) (*types.Header, bool) {
block, has := bs.getUnfinalisedBlock(hash)
if !has {
return nil, false
}

return &block.Header, true
}

func (bs *BlockState) getUnfinalisedBlock(hash common.Hash) (*types.Block, bool) {
block, has := bs.unfinalisedBlocks.Load(hash)
if !has {
return nil, false
}

// TODO: dot/core tx re-org test seems to abort here due to block body being invalid?
return block.(*types.Block), true
}

func (bs *BlockState) getAndDeleteUnfinalisedBlock(hash common.Hash) (*types.Block, bool) {
block, has := bs.unfinalisedBlocks.LoadAndDelete(hash)
if !has {
return nil, false
}

return block.(*types.Block), true
}

// HasHeader returns if the db contains a header with the given hash
func (bs *BlockState) HasHeader(hash common.Hash) (bool, error) {
if bs.hasUnfinalisedBlock(hash) {
if bs.unfinalisedBlocks.getBlock(hash) != nil {
return true, nil
}

return bs.db.Has(headerKey(hash))
}

// GetHeader returns a BlockHeader for a given hash
func (bs *BlockState) GetHeader(hash common.Hash) (*types.Header, error) {
header, has := bs.getUnfinalisedHeader(hash)
if has {
func (bs *BlockState) GetHeader(hash common.Hash) (header *types.Header, err error) {
header = bs.unfinalisedBlocks.getBlockHeader(hash)
if header != nil {
return header, nil
}

Expand Down Expand Up @@ -313,8 +276,8 @@ func (bs *BlockState) GetBlockByHash(hash common.Hash) (*types.Block, error) {
bs.RLock()
defer bs.RUnlock()

block, has := bs.getUnfinalisedBlock(hash)
if has {
block := bs.unfinalisedBlocks.getBlock(hash)
if block != nil {
return block, nil
}

Expand Down Expand Up @@ -346,18 +309,18 @@ func (bs *BlockState) HasBlockBody(hash common.Hash) (bool, error) {
bs.RLock()
defer bs.RUnlock()

if bs.hasUnfinalisedBlock(hash) {
if bs.unfinalisedBlocks.getBlock(hash) != nil {
return true, nil
}

return bs.db.Has(blockBodyKey(hash))
}

// GetBlockBody will return Body for a given hash
func (bs *BlockState) GetBlockBody(hash common.Hash) (*types.Body, error) {
block, has := bs.getUnfinalisedBlock(hash)
if has {
return &block.Body, nil
func (bs *BlockState) GetBlockBody(hash common.Hash) (body *types.Body, err error) {
body = bs.unfinalisedBlocks.getBlockBody(hash)
if body != nil {
return body, nil
}

data, err := bs.db.Get(blockBodyKey(hash))
Expand Down Expand Up @@ -417,7 +380,7 @@ func (bs *BlockState) AddBlockWithArrivalTime(block *types.Block, arrivalTime ti
return err
}

bs.storeUnfinalisedBlock(block)
bs.unfinalisedBlocks.store(block)
go bs.notifyImported(block)
return nil
}
Expand All @@ -433,7 +396,7 @@ func (bs *BlockState) AddBlockToBlockTree(block *types.Block) error {
arrivalTime = time.Now()
}

bs.storeUnfinalisedBlock(block)
bs.unfinalisedBlocks.store(block)
return bs.bt.AddBlock(&block.Header, arrivalTime)
}

Expand Down
20 changes: 10 additions & 10 deletions dot/state/block_finalisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er

pruned := bs.bt.Prune(hash)
for _, hash := range pruned {
block, has := bs.getAndDeleteUnfinalisedBlock(hash)
if !has {
blockHeader := bs.unfinalisedBlocks.delete(hash)
if blockHeader == nil {
continue
}

bs.tries.delete(block.Header.StateRoot)
bs.tries.delete(blockHeader.StateRoot)

logger.Tracef("pruned block number %s with hash %s", block.Header.Number, hash)
logger.Tracef("pruned block number %s with hash %s", blockHeader.Number, hash)
}

// if nothing was previously finalised, set the first slot of the network to the
Expand Down Expand Up @@ -207,8 +207,8 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error {
continue
}

block, has := bs.getUnfinalisedBlock(hash)
if !has {
block := bs.unfinalisedBlocks.getBlock(hash)
if block == nil {
return fmt.Errorf("failed to find block in unfinalised block map, block=%s", hash)
}

Expand All @@ -234,14 +234,14 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error {
}

// delete from the unfinalisedBlockMap and delete reference to in-memory trie
block, has = bs.getAndDeleteUnfinalisedBlock(hash)
if !has {
blockHeader := bs.unfinalisedBlocks.delete(hash)
if blockHeader == nil {
continue
}

bs.tries.delete(block.Header.StateRoot)
bs.tries.delete(blockHeader.StateRoot)

logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash)
logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", blockHeader.Number, hash)
}

return batch.Flush()
Expand Down
91 changes: 91 additions & 0 deletions dot/state/hashtoblockmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2022 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package state

import (
"sync"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
)

// hashToBlockMap implements a thread safe map of block header hashes
// to block pointers. It has helper methods to fit the needs of callers
// in this package.
type hashToBlockMap struct {
mutex sync.RWMutex
mapping map[common.Hash]*types.Block
}

func newHashToBlockMap() *hashToBlockMap {
return &hashToBlockMap{
mapping: make(map[common.Hash]*types.Block),
}
}

// getBlock returns a pointer to the block stored at the hash given,
// or nil if not found.
// Note this returns a pointer to the block so modifying the returned value
// will modify the block stored in the map, potentially leading to data races
// or unwanted changes, so be careful.
func (h *hashToBlockMap) getBlock(hash common.Hash) (block *types.Block) {
h.mutex.RLock()
defer h.mutex.RUnlock()
return h.mapping[hash]
}

// getBlockHeader returns a pointer to the header of the block stored at the
// hash given, or nil if not found.
// Note this returns a pointer to the header of the block so modifying the
// returned value will modify the header of the block stored in the map,
// potentially leading to data races or unwanted changes, so be careful.
func (h *hashToBlockMap) getBlockHeader(hash common.Hash) (header *types.Header) {
h.mutex.RLock()
defer h.mutex.RUnlock()
block := h.mapping[hash]
if block == nil {
return nil
}
return &block.Header
}

// getBlockBody returns a pointer to the body of the block stored at the
// hash given, or nil if not found.
// Note this returns a pointer to the body of the block so modifying the
// returned value will modify the body of the block stored in the map,
// potentially leading to data races or unwanted changes, so be careful.
func (h *hashToBlockMap) getBlockBody(hash common.Hash) (body *types.Body) {
h.mutex.RLock()
defer h.mutex.RUnlock()
block := h.mapping[hash]
if block == nil {
return nil
}
return &block.Body
}

// store stores a block and uses its header hash digest as key.
// Note the block is not deep copied so mutating the passed argument
// will lead to mutation for the block in the map and returned by this map.
// Also note this operation sets the hash field on the block header because of
// the call to block.Header.Hash().
func (h *hashToBlockMap) store(block *types.Block) {
h.mutex.Lock()
defer h.mutex.Unlock()
h.mapping[block.Header.Hash()] = block
}

// delete deletes the block stored at the hash given, and returns
// a pointer to the header of the block deleted from the map,
// or nil if the block is not found.
func (h *hashToBlockMap) delete(hash common.Hash) (deletedHeader *types.Header) {
h.mutex.Lock()
defer h.mutex.Unlock()
block := h.mapping[hash]
delete(h.mapping, hash)
if block == nil {
return nil
}
return &block.Header
}
Loading

0 comments on commit 666795b

Please sign in to comment.