diff --git a/blocksync/reactor.go b/blocksync/reactor.go index cd5a811688e..86f36f3e16d 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -3,6 +3,7 @@ package blocksync import ( "fmt" "reflect" + "strings" "time" "github.com/cometbft/cometbft/libs/log" @@ -56,8 +57,9 @@ type Reactor struct { pool *BlockPool blockSync bool - requestsCh <-chan BlockRequest - errorsCh <-chan peerError + requestsCh <-chan BlockRequest + errorsCh <-chan peerError + appHashErrorsCh chan p2p.AppHashError } // NewReactor returns new reactor instance. @@ -71,8 +73,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS requestsCh := make(chan BlockRequest, maxTotalRequesters) - const capacity = 1000 // must be bigger than peers count - errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock + const capacity = 1000 // must be bigger than peers count + errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock + appHashErrorsCh := make(chan p2p.AppHashError) // create an unbuffered channel to stream appHash errors startHeight := store.Height() + 1 if startHeight == 1 { @@ -81,13 +84,14 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS pool := NewBlockPool(startHeight, requestsCh, errorsCh) bcR := &Reactor{ - initialState: state, - blockExec: blockExec, - store: store, - pool: pool, - blockSync: blockSync, - requestsCh: requestsCh, - errorsCh: errorsCh, + initialState: state, + blockExec: blockExec, + store: store, + pool: pool, + blockSync: blockSync, + requestsCh: requestsCh, + errorsCh: errorsCh, + appHashErrorsCh: appHashErrorsCh, } bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR) return bcR @@ -361,6 +365,19 @@ FOR_LOOP: } if err != nil { + // If this is an appHash or lastResultsHash error, also pass to the appHashError channel. + if strings.Contains(err.Error(), "wrong Block.Header.AppHash") { + bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{ + Err: err, + Height: uint64(first.Height), + } + } else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") { + bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{ + Err: err, + Height: uint64(first.Height - 1), + } + } + bcR.Logger.Error("Error in validation", "err", err) peerID := bcR.pool.RedoRequest(first.Height) peer := bcR.Switch.Peers().Get(peerID) @@ -415,3 +432,7 @@ func (bcR *Reactor) BroadcastStatusRequest() { Message: &bcproto.StatusRequest{}, }) } + +func (bcR *Reactor) AppHashErrorsCh() chan p2p.AppHashError { + return bcR.appHashErrorsCh +} diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 861f5e4277f..cc1fa39dd4a 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -604,3 +604,5 @@ func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) { } func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } + +func (br *ByzantineReactor) AppHashErrorsCh() chan p2p.AppHashError { return nil } diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 3a137a78668..3918d1bb797 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -21,9 +21,10 @@ import ( // peers you received it from. type Reactor struct { p2p.BaseReactor - config *cfg.MempoolConfig - mempool *TxMempool - ids *mempoolIDs + config *cfg.MempoolConfig + mempool *TxMempool + ids *mempoolIDs + appHashErrorsCh chan p2p.AppHashError } type mempoolIDs struct { @@ -280,3 +281,7 @@ type TxsMessage struct { func (m *TxsMessage) String() string { return fmt.Sprintf("[TxsMessage %v]", m.Txs) } + +func (memR *Reactor) AppHashErrorsCh() chan p2p.AppHashError { + return memR.appHashErrorsCh +} diff --git a/node/node.go b/node/node.go index 4e5238d1dad..ac3951b980f 100644 --- a/node/node.go +++ b/node/node.go @@ -1255,6 +1255,11 @@ func (n *Node) ConsensusReactor() *cs.Reactor { return n.consensusReactor } +// BCReactor returns the Node's BlockchainReactor. +func (n *Node) BCReactor() p2p.Reactor { + return n.bcReactor +} + // MempoolReactor returns the Node's mempool reactor. func (n *Node) MempoolReactor() p2p.Reactor { return n.mempoolReactor diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index fe56283728e..d9e2f7b3396 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -1,10 +1,21 @@ package p2p import ( + "fmt" + "github.com/cometbft/cometbft/libs/service" "github.com/cometbft/cometbft/p2p/conn" ) +type AppHashError struct { + Err error + Height uint64 +} + +func (e AppHashError) Error() string { + return fmt.Sprintf("app hash error at height %v: %s", e.Height, e.Err.Error()) +} + // Reactor is responsible for handling incoming messages on one or more // Channel. Switch calls GetChannels when reactor is added to it. When a new // peer joins our node, InitPeer and AddPeer are called. RemovePeer is called @@ -41,6 +52,10 @@ type Reactor interface { // ReceiveEnvelope is called by the switch when an envelope is received from any connected // peer on any of the channels registered by the reactor. ReceiveEnvelope(Envelope) + + // AppHashErrorsCh is used to stream hash errors to the sdk, which is then used + // to provide further debugging information in logs to the user. + AppHashErrorsCh() chan AppHashError } //-------------------------------------- @@ -48,12 +63,14 @@ type Reactor interface { type BaseReactor struct { service.BaseService // Provides Start, Stop, .Quit Switch *Switch + AppHashErrorChanBR chan AppHashError } func NewBaseReactor(name string, impl Reactor) *BaseReactor { return &BaseReactor{ - BaseService: *service.NewBaseService(nil, name, impl), - Switch: nil, + BaseService: *service.NewBaseService(nil, name, impl), + Switch: nil, + AppHashErrorChanBR: impl.AppHashErrorsCh(), } } @@ -66,3 +83,4 @@ func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} func (*BaseReactor) ReceiveEnvelope(e Envelope) {} func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } +func (*BaseReactor) AppHashErrorsCh() chan AppHashError { return nil }