Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dot/network, lib/grandpa): update network.ConsensusMessage, add grandpa.NeighbourMessage and handle accordingly #1519

Merged
merged 37 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e6b4efd
don't start justification requesting if syncing
noot Apr 6, 2021
60246a2
fix
noot Apr 6, 2021
a475b73
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Apr 8, 2021
f976f1e
write handshake directly to stream
noot Apr 12, 2021
87b1f17
lint
noot Apr 12, 2021
bf7072f
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Apr 12, 2021
3915d03
cleanup
noot Apr 12, 2021
dce28d6
lint
noot Apr 12, 2021
223f1aa
cleanup
noot Apr 12, 2021
b006e57
fix log
noot Apr 12, 2021
9b805ab
fix test
noot Apr 12, 2021
b4243ef
keep all unfinalized tries in memory
noot Apr 12, 2021
5388e0e
check BlockRequestMessage.From length
noot Apr 12, 2021
14e6c4a
cleanup
noot Apr 12, 2021
8256bcb
fix error
noot Apr 12, 2021
d6b192c
add grandpa NeighbourMessage type and handle accordingly
noot Apr 13, 2021
bf13265
fix test
noot Apr 13, 2021
d3e30e1
fix grandpa tests
noot Apr 13, 2021
a3620c0
lint
noot Apr 13, 2021
c267ccb
lint
noot Apr 13, 2021
9e8189f
fix test
noot Apr 13, 2021
41fe922
merge w/ development
noot Apr 13, 2021
4b5d23b
move persistent peer reconnect to goroutine
noot Apr 13, 2021
9a6e937
Merge branch 'noot/notification-messages' of github.com:ChainSafe/gos…
noot Apr 13, 2021
d02b174
cleanup
noot Apr 13, 2021
8708374
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Apr 13, 2021
2d50d14
update blocktree err checking in syncer
noot Apr 13, 2021
21f464f
Merge branch 'development' into noot/notification-messages
noot Apr 13, 2021
28cab3e
re-add justification request logic
noot Apr 13, 2021
004b5f6
add test for re-org check
noot Apr 13, 2021
83b5dfd
Merge branch 'noot/notification-messages' of github.com:ChainSafe/gos…
noot Apr 13, 2021
7622914
add grandpa msg handler tests
noot Apr 13, 2021
6d108d8
lint
noot Apr 13, 2021
8a7a145
lint
noot Apr 13, 2021
6d45402
add test
noot Apr 14, 2021
2012c33
merge w development
noot Apr 14, 2021
1c0a7ff
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Apr 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,6 @@ var _ NotificationsMessage = &ConsensusMessage{}

// ConsensusMessage is mostly opaque to us
type ConsensusMessage struct {
// Identifies consensus engine.
ConsensusEngineID types.ConsensusEngineID
// Message payload.
Data []byte
}

Expand All @@ -377,23 +374,17 @@ func (cm *ConsensusMessage) Type() byte {

// String is the string
func (cm *ConsensusMessage) String() string {
return fmt.Sprintf("ConsensusMessage ConsensusEngineID=%d, DATA=%x", cm.ConsensusEngineID, cm.Data)
return fmt.Sprintf("ConsensusMessage Data=%x", cm.Data)
}

// Encode encodes a block response message using SCALE
func (cm *ConsensusMessage) Encode() ([]byte, error) {
encMsg := cm.ConsensusEngineID.ToBytes()
return append(encMsg, cm.Data...), nil
return cm.Data, nil
}

// Decode the message into a ConsensusMessage
func (cm *ConsensusMessage) Decode(in []byte) error {
if len(in) < 5 {
return errors.New("cannot decode ConsensusMessage: encoding is too short")
}

cm.ConsensusEngineID = types.NewConsensusEngineID(in[:4])
cm.Data = in[4:]
cm.Data = in
return nil
}

Expand Down
9 changes: 2 additions & 7 deletions dot/network/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,8 @@ func TestDecodeTransactionMessageTwoExtrinsics(t *testing.T) {
}

func TestDecodeConsensusMessage(t *testing.T) {
ConsensusEngineID := types.BabeEngineID

testID := hex.EncodeToString(types.BabeEngineID.ToBytes())
testData := "03100405"

msg := "0x" + testID + testData // 0x4241424503100405
msg := "0x" + testData

encMsg, err := common.HexToBytes(msg)
require.Nil(t, err)
Expand All @@ -358,8 +354,7 @@ func TestDecodeConsensusMessage(t *testing.T) {
require.Nil(t, err)

expected := &ConsensusMessage{
ConsensusEngineID: ConsensusEngineID,
Data: out,
Data: out,
}

require.Equal(t, expected, m)
Expand Down
16 changes: 13 additions & 3 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/common/optional"
"github.com/ChainSafe/gossamer/lib/common/variadic"
Expand Down Expand Up @@ -163,7 +164,6 @@ func newSyncQueue(s *Service) *syncQueue {
func (q *syncQueue) start() {
go q.handleResponseQueue()
go q.syncAtHead()
go q.finalizeAtHead()

go q.processBlockRequests()
go q.processBlockResponses()
Expand Down Expand Up @@ -693,7 +693,7 @@ func (q *syncQueue) handleBlockJustification(data []*types.BlockData) {
func (q *syncQueue) handleBlockData(data []*types.BlockData) {
finalized, err := q.s.blockState.GetFinalizedHeader(0, 0)
if err != nil {
panic(err) // TODO: don't panic but try again. seems blockState needs better concurrency handling
panic(err) // this should never happen
}

end := data[len(data)-1].Number().Int64()
Expand Down Expand Up @@ -738,13 +738,23 @@ func (q *syncQueue) handleBlockData(data []*types.BlockData) {
func (q *syncQueue) handleBlockDataFailure(idx int, err error, data []*types.BlockData) {
logger.Warn("failed to handle block data", "failed on block", q.currStart+int64(idx), "error", err)

if errors.Is(err, chaindb.ErrKeyNotFound) {
if errors.Is(err, chaindb.ErrKeyNotFound) || errors.Is(err, blocktree.ErrParentNotFound) {
finalized, err := q.s.blockState.GetFinalizedHeader(0, 0)
if err != nil {
panic(err)
}

header, err := types.NewHeaderFromOptional(data[idx].Header)
if err != nil {
logger.Debug("failed to get header from BlockData", "idx", idx, "error", err)
return
}

// don't request a chain that's been dropped
if header.Number.Int64() <= finalized.Number.Int64() {
return
}

parentHash := header.ParentHash
req := createBlockRequestWithHash(parentHash, 0)

Expand Down
80 changes: 63 additions & 17 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ const pruneKeyBufferSize = 1000

// BlockState defines fields for manipulating the state of blocks, such as BlockTree, BlockDB and Header
type BlockState struct {
bt *blocktree.BlockTree
baseDB chaindb.Database
db chaindb.Database
lock sync.RWMutex
bt *blocktree.BlockTree
baseDB chaindb.Database
db chaindb.Database
sync.RWMutex
genesisHash common.Hash

// block notifiers
Expand Down Expand Up @@ -268,7 +268,7 @@ func (bs *BlockState) GetHeader(hash common.Hash) (*types.Header, error) {
func (bs *BlockState) GetHashByNumber(num *big.Int) (common.Hash, error) {
bh, err := bs.db.Get(headerHashKey(num.Uint64()))
if err != nil {
return common.Hash{}, fmt.Errorf("cannot get block %d: %s", num, err)
return common.Hash{}, fmt.Errorf("cannot get block %d: %w", num, err)
}

return common.NewHash(bh), nil
Expand All @@ -278,7 +278,7 @@ func (bs *BlockState) GetHashByNumber(num *big.Int) (common.Hash, error) {
func (bs *BlockState) GetHeaderByNumber(num *big.Int) (*types.Header, error) {
bh, err := bs.db.Get(headerHashKey(num.Uint64()))
if err != nil {
return nil, fmt.Errorf("cannot get block %d: %s", num, err)
return nil, fmt.Errorf("cannot get block %d: %w", num, err)
}

hash := common.NewHash(bh)
Expand All @@ -304,7 +304,7 @@ func (bs *BlockState) GetBlockByNumber(num *big.Int) (*types.Block, error) {
// First retrieve the block hash in a byte array based on the block number from the database
byteHash, err := bs.db.Get(headerHashKey(num.Uint64()))
if err != nil {
return nil, fmt.Errorf("cannot get block %d: %s", num, err)
return nil, fmt.Errorf("cannot get block %d: %w", num, err)
}

// Then find the block based on the hash
Expand All @@ -322,17 +322,14 @@ func (bs *BlockState) GetBlockHash(blockNumber *big.Int) (*common.Hash, error) {
// First retrieve the block hash in a byte array based on the block number from the database
byteHash, err := bs.db.Get(headerHashKey(blockNumber.Uint64()))
if err != nil {
return nil, fmt.Errorf("cannot get block %d: %s", blockNumber, err)
return nil, fmt.Errorf("cannot get block %d: %w", blockNumber, err)
}
hash := common.NewHash(byteHash)
return &hash, nil
}

// SetHeader will set the header into DB
func (bs *BlockState) SetHeader(header *types.Header) error {
arijitAD marked this conversation as resolved.
Show resolved Hide resolved
bs.lock.Lock()
defer bs.lock.Unlock()

hash := header.Hash()

// Write the encoded header
Expand Down Expand Up @@ -366,11 +363,7 @@ func (bs *BlockState) GetBlockBody(hash common.Hash) (*types.Body, error) {

// SetBlockBody will add a block body to the db
func (bs *BlockState) SetBlockBody(hash common.Hash, body *types.Body) error {
bs.lock.Lock()
defer bs.lock.Unlock()
Comment on lines -369 to -370
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a confused why some locks were removed (lines 333, and 369), but others added in different spots (add at line 423, 495 and 593). Doesn't seem to have any race conditions now, so this is working. How do we determine where locks are needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the db operations are already threadsafe (chaindb has locks for all db.Put, db.Get, etc.), and same for the blocktree, so I only added locks for functions that did multiple operations in them where some variables need to not change until the function is done, for example AddBlock requires no other blocks to be added until it's finished


err := bs.db.Put(blockBodyKey(hash), body.AsOptional().Value())
return err
return bs.db.Put(blockBodyKey(hash), body.AsOptional().Value())
}

// HasFinalizedBlock returns true if there is a finalized block for a given round and setID, false otherwise
Expand Down Expand Up @@ -427,6 +420,9 @@ func (bs *BlockState) GetFinalizedHash(round, setID uint64) (common.Hash, error)

// SetFinalizedHash sets the latest finalized block header
func (bs *BlockState) SetFinalizedHash(hash common.Hash, round, setID uint64) error {
bs.Lock()
defer bs.Unlock()

go bs.notifyFinalized(hash)
if round > 0 {
err := bs.SetRound(round)
Expand Down Expand Up @@ -496,6 +492,8 @@ func (bs *BlockState) CompareAndSetBlockData(bd *types.BlockData) error {

// AddBlock adds a block to the blocktree and the DB with arrival time as current unix time
func (bs *BlockState) AddBlock(block *types.Block) error {
bs.Lock()
defer bs.Unlock()
return bs.AddBlockWithArrivalTime(block, time.Now())
}

Expand All @@ -506,6 +504,8 @@ func (bs *BlockState) AddBlockWithArrivalTime(block *types.Block, arrivalTime ti
return err
}

prevHead := bs.bt.DeepestBlockHash()

// add block to blocktree
err = bs.bt.AddBlock(block.Header, uint64(arrivalTime.UnixNano()))
if err != nil {
Expand Down Expand Up @@ -541,12 +541,58 @@ func (bs *BlockState) AddBlockWithArrivalTime(block *types.Block, arrivalTime ti
return err
}

// check if there was a re-org, if so, re-set the canonical number->hash mapping
err = bs.handleAddedBlock(prevHead, bs.bt.DeepestBlockHash())
if err != nil {
return err
}

go bs.notifyImported(block)
return bs.baseDB.Flush()
}

// handleAddedBlock re-sets the canonical number->hash mapping if there was a chain re-org.
// prev is the previous best block hash before the new block was added to the blocktree.
// curr is the current best blogetck hash.
func (bs *BlockState) handleAddedBlock(prev, curr common.Hash) error {
ancestor, err := bs.HighestCommonAncestor(prev, curr)
if err != nil {
return err
}

// if the highest common ancestor of the previous chain head and current chain head is the previous chain head,
// then the current chain head is the descendant of the previous and thus are on the same chain
if ancestor == prev {
return nil
}

subchain, err := bs.SubChain(ancestor, curr)
if err != nil {
return err
}

batch := bs.db.NewBatch()
for _, hash := range subchain {
// TODO: set number from ancestor.Number + i ?
header, err := bs.GetHeader(hash)
if err != nil {
return fmt.Errorf("failed to get header in subchain: %w", err)
}

err = batch.Put(headerHashKey(header.Number.Uint64()), hash.ToBytes())
if err != nil {
return err
}
}

return batch.Flush()
}

// AddBlockToBlockTree adds the given block to the blocktree. It does not write it to the database.
func (bs *BlockState) AddBlockToBlockTree(header *types.Header) error {
bs.Lock()
defer bs.Unlock()

arrivalTime, err := bs.GetArrivalTime(header.Hash())
if err != nil {
arrivalTime = time.Now()
Expand All @@ -567,7 +613,7 @@ func (bs *BlockState) isBlockOnCurrentChain(header *types.Header) (bool, error)
}

// if the new block is ahead of our best block, then it is on our current chain.
if header.Number.Cmp(bestBlock.Number) == 1 {
if header.Number.Cmp(bestBlock.Number) > 0 {
return true, nil
}

Expand Down
12 changes: 6 additions & 6 deletions dot/state/block_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func (bs *BlockState) HasReceipt(hash common.Hash) (bool, error) {

// SetReceipt sets a Receipt in the database
func (bs *BlockState) SetReceipt(hash common.Hash, data []byte) error {
bs.lock.Lock()
defer bs.lock.Unlock()
bs.Lock()
defer bs.Unlock()

err := bs.db.Put(prefixKey(hash, receiptPrefix), data)
if err != nil {
Expand All @@ -60,8 +60,8 @@ func (bs *BlockState) HasMessageQueue(hash common.Hash) (bool, error) {

// SetMessageQueue sets a MessageQueue in the database
func (bs *BlockState) SetMessageQueue(hash common.Hash, data []byte) error {
bs.lock.Lock()
defer bs.lock.Unlock()
bs.Lock()
defer bs.Unlock()

err := bs.db.Put(prefixKey(hash, messageQueuePrefix), data)
if err != nil {
Expand All @@ -88,8 +88,8 @@ func (bs *BlockState) HasJustification(hash common.Hash) (bool, error) {

// SetJustification sets a Justification in the database
func (bs *BlockState) SetJustification(hash common.Hash, data []byte) error {
bs.lock.Lock()
defer bs.lock.Unlock()
bs.Lock()
defer bs.Unlock()

err := bs.db.Put(prefixKey(hash, justificationPrefix), data)
if err != nil {
Expand Down
Loading