diff --git a/core/blockchain.go b/core/blockchain.go index 7f905bf55b..06e2a68302 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -259,23 +259,25 @@ type BlockChain struct { triesInMemory uint64 txIndexer *txIndexer // Transaction indexer, might be nil if not enabled - hc *HeaderChain - rmLogsFeed event.Feed - chainFeed event.Feed - chainSideFeed event.Feed - chainHeadFeed event.Feed - chainBlockFeed event.Feed - logsFeed event.Feed - blockProcFeed event.Feed - finalizedHeaderFeed event.Feed - scope event.SubscriptionScope - genesisBlock *types.Block + hc *HeaderChain + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + chainBlockFeed event.Feed + logsFeed event.Feed + blockProcFeed event.Feed + finalizedHeaderFeed event.Feed + highestVerifiedBlockFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block // This mutex synchronizes chain write operations. // Readers don't need to take it, they can just read the database. chainmu *syncx.ClosableMutex highestVerifiedHeader atomic.Pointer[types.Header] + highestVerifiedBlock atomic.Pointer[types.Header] currentBlock atomic.Pointer[types.Header] // Current head of the chain currentSnapBlock atomic.Pointer[types.Header] // Current head of snap-sync currentFinalBlock atomic.Pointer[types.Header] // Latest (consensus) finalized block @@ -400,6 +402,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis } bc.highestVerifiedHeader.Store(nil) + bc.highestVerifiedBlock.Store(nil) bc.currentBlock.Store(nil) bc.currentSnapBlock.Store(nil) bc.chasingHead.Store(nil) @@ -1925,8 +1928,12 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types if err != nil { return NonStatTy, err } - if reorg && mux != nil { - mux.Post(NewSealedBlockEvent{Block: block}) + if reorg { + bc.highestVerifiedBlock.Store(types.CopyHeader(block.Header())) + bc.highestVerifiedBlockFeed.Send(HighestVerifiedBlockEvent{Header: block.Header()}) + if mux != nil { + mux.Post(NewSealedBlockEvent{Block: block}) + } } if err := bc.writeBlockWithState(block, receipts, state); err != nil { diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index d440590b8b..6966311dfd 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -98,6 +98,15 @@ func (bc *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header { return bc.hc.GetHeaderByHash(hash) } +// GetVerifiedBlockByHash retrieves the header of a verified block, it may be only in memory. +func (bc *BlockChain) GetVerifiedBlockByHash(hash common.Hash) *types.Header { + highestVerifiedBlock := bc.highestVerifiedBlock.Load() + if highestVerifiedBlock != nil && highestVerifiedBlock.Hash() == hash { + return highestVerifiedBlock + } + return bc.hc.GetHeaderByHash(hash) +} + // GetHeaderByNumber retrieves a block header from the database by number, // caching it (associated with its hash) if found. func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header { @@ -486,6 +495,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) } +// SubscribeHighestVerifiedBlockEvent registers a subscription of HighestVerifiedBlockEvent. +func (bc *BlockChain) SubscribeHighestVerifiedHeaderEvent(ch chan<- HighestVerifiedBlockEvent) event.Subscription { + return bc.scope.Track(bc.highestVerifiedBlockFeed.Subscribe(ch)) +} + // SubscribeChainBlockEvent registers a subscription of ChainBlockEvent. func (bc *BlockChain) SubscribeChainBlockEvent(ch chan<- ChainHeadEvent) event.Subscription { return bc.scope.Track(bc.chainBlockFeed.Subscribe(ch)) diff --git a/core/events.go b/core/events.go index c479835662..bfd0d3534a 100644 --- a/core/events.go +++ b/core/events.go @@ -53,3 +53,5 @@ type ChainSideEvent struct { } type ChainHeadEvent struct{ Block *types.Block } + +type HighestVerifiedBlockEvent struct{ Header *types.Header } diff --git a/core/vote/vote_manager.go b/core/vote/vote_manager.go index 1ba49f627f..ba422e5773 100644 --- a/core/vote/vote_manager.go +++ b/core/vote/vote_manager.go @@ -33,8 +33,8 @@ type VoteManager struct { chain *core.BlockChain - chainHeadCh chan core.ChainHeadEvent - chainHeadSub event.Subscription + highestVerifiedBlockCh chan core.HighestVerifiedBlockEvent + highestVerifiedBlockSub event.Subscription // used for backup validators to sync votes from corresponding mining validator syncVoteCh chan core.NewVoteEvent @@ -49,12 +49,12 @@ type VoteManager struct { func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journalPath, blsPasswordPath, blsWalletPath string, engine consensus.PoSA) (*VoteManager, error) { voteManager := &VoteManager{ - eth: eth, - chain: chain, - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - syncVoteCh: make(chan core.NewVoteEvent, voteBufferForPut), - pool: pool, - engine: engine, + eth: eth, + chain: chain, + highestVerifiedBlockCh: make(chan core.HighestVerifiedBlockEvent, highestVerifiedBlockChanSize), + syncVoteCh: make(chan core.NewVoteEvent, voteBufferForPut), + pool: pool, + engine: engine, } // Create voteSigner. @@ -74,7 +74,7 @@ func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journal voteManager.journal = voteJournal // Subscribe to chain head event. - voteManager.chainHeadSub = voteManager.chain.SubscribeChainHeadEvent(voteManager.chainHeadCh) + voteManager.highestVerifiedBlockSub = voteManager.chain.SubscribeHighestVerifiedHeaderEvent(voteManager.highestVerifiedBlockCh) voteManager.syncVoteSub = voteManager.pool.SubscribeNewVoteEvent(voteManager.syncVoteCh) go voteManager.loop() @@ -84,7 +84,7 @@ func NewVoteManager(eth Backend, chain *core.BlockChain, pool *VotePool, journal func (voteManager *VoteManager) loop() { log.Debug("vote manager routine loop started") - defer voteManager.chainHeadSub.Unsubscribe() + defer voteManager.highestVerifiedBlockSub.Unsubscribe() defer voteManager.syncVoteSub.Unsubscribe() events := voteManager.eth.EventMux().Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) @@ -119,7 +119,7 @@ func (voteManager *VoteManager) loop() { log.Debug("downloader is in DoneEvent mode, set the startVote flag to true") startVote = true } - case cHead := <-voteManager.chainHeadCh: + case cHead := <-voteManager.highestVerifiedBlockCh: if !startVote { log.Debug("startVote flag is false, continue") continue @@ -135,12 +135,12 @@ func (voteManager *VoteManager) loop() { continue } - if cHead.Block == nil { - log.Debug("cHead.Block is nil, continue") + if cHead.Header == nil { + log.Debug("cHead.Header is nil, continue") continue } - curHead := cHead.Block.Header() + curHead := cHead.Header if p, ok := voteManager.engine.(*parlia.Parlia); ok { nextBlockMinedTime := time.Unix(int64((curHead.Time + p.Period())), 0) timeForBroadcast := 50 * time.Millisecond // enough to broadcast a vote @@ -217,7 +217,7 @@ func (voteManager *VoteManager) loop() { case <-voteManager.syncVoteSub.Err(): log.Debug("voteManager subscribed votes failed") return - case <-voteManager.chainHeadSub.Err(): + case <-voteManager.highestVerifiedBlockSub.Err(): log.Debug("voteManager subscribed chainHead failed") return } diff --git a/core/vote/vote_pool.go b/core/vote/vote_pool.go index defa92674b..3b64e2cea3 100644 --- a/core/vote/vote_pool.go +++ b/core/vote/vote_pool.go @@ -24,7 +24,7 @@ const ( lowerLimitOfVoteBlockNumber = 256 upperLimitOfVoteBlockNumber = 11 // refer to fetcher.maxUncleDist - chainHeadChanSize = 10 // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + highestVerifiedBlockChanSize = 10 // highestVerifiedBlockChanSize is the size of channel listening to HighestVerifiedBlockEvent. ) var ( @@ -57,8 +57,8 @@ type VotePool struct { curVotesPq *votesPriorityQueue futureVotesPq *votesPriorityQueue - chainHeadCh chan core.ChainHeadEvent - chainHeadSub event.Subscription + highestVerifiedBlockCh chan core.HighestVerifiedBlockEvent + highestVerifiedBlockSub event.Subscription votesCh chan *types.VoteEnvelope @@ -69,19 +69,19 @@ type votesPriorityQueue []*types.VoteData func NewVotePool(chain *core.BlockChain, engine consensus.PoSA) *VotePool { votePool := &VotePool{ - chain: chain, - receivedVotes: mapset.NewSet[common.Hash](), - curVotes: make(map[common.Hash]*VoteBox), - futureVotes: make(map[common.Hash]*VoteBox), - curVotesPq: &votesPriorityQueue{}, - futureVotesPq: &votesPriorityQueue{}, - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - votesCh: make(chan *types.VoteEnvelope, voteBufferForPut), - engine: engine, + chain: chain, + receivedVotes: mapset.NewSet[common.Hash](), + curVotes: make(map[common.Hash]*VoteBox), + futureVotes: make(map[common.Hash]*VoteBox), + curVotesPq: &votesPriorityQueue{}, + futureVotesPq: &votesPriorityQueue{}, + highestVerifiedBlockCh: make(chan core.HighestVerifiedBlockEvent, highestVerifiedBlockChanSize), + votesCh: make(chan *types.VoteEnvelope, voteBufferForPut), + engine: engine, } // Subscribe events from blockchain and start the main event loop. - votePool.chainHeadSub = votePool.chain.SubscribeChainHeadEvent(votePool.chainHeadCh) + votePool.highestVerifiedBlockSub = votePool.chain.SubscribeHighestVerifiedHeaderEvent(votePool.highestVerifiedBlockCh) go votePool.loop() return votePool @@ -89,18 +89,18 @@ func NewVotePool(chain *core.BlockChain, engine consensus.PoSA) *VotePool { // loop is the vote pool's main even loop, waiting for and reacting to outside blockchain events and votes channel event. func (pool *VotePool) loop() { - defer pool.chainHeadSub.Unsubscribe() + defer pool.highestVerifiedBlockSub.Unsubscribe() for { select { // Handle ChainHeadEvent. - case ev := <-pool.chainHeadCh: - if ev.Block != nil { - latestBlockNumber := ev.Block.NumberU64() + case ev := <-pool.highestVerifiedBlockCh: + if ev.Header != nil { + latestBlockNumber := ev.Header.Number.Uint64() pool.prune(latestBlockNumber) - pool.transferVotesFromFutureToCur(ev.Block.Header()) + pool.transferVotesFromFutureToCur(ev.Header) } - case <-pool.chainHeadSub.Err(): + case <-pool.highestVerifiedBlockSub.Err(): return // Handle votes channel and put the vote into vote pool. @@ -135,7 +135,7 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool { var votesPq *votesPriorityQueue isFutureVote := false - voteBlock := pool.chain.GetHeaderByHash(targetHash) + voteBlock := pool.chain.GetVerifiedBlockByHash(targetHash) if voteBlock == nil { votes = pool.futureVotes votesPq = pool.futureVotesPq @@ -226,7 +226,7 @@ func (pool *VotePool) transferVotesFromFutureToCur(latestBlockHeader *types.Head futurePqBuffer := make([]*types.VoteData, 0) for futurePq.Len() > 0 && futurePq.Peek().TargetNumber <= latestBlockNumber { blockHash := futurePq.Peek().TargetHash - header := pool.chain.GetHeaderByHash(blockHash) + header := pool.chain.GetVerifiedBlockByHash(blockHash) if header == nil { // Put into pq buffer used for later put again into futurePq futurePqBuffer = append(futurePqBuffer, heap.Pop(futurePq).(*types.VoteData))