diff --git a/core/vote/vote_manager.go b/core/vote/vote_manager.go index dcab5c38dd..7c3fbff51e 100644 --- a/core/vote/vote_manager.go +++ b/core/vote/vote_manager.go @@ -164,6 +164,17 @@ func (voteManager *VoteManager) loop() { voteManager.pool.PutVote(voteMessage) votesManagerCounter.Inc(1) } + case voteMessage := <-voteManager.pool.BasicVerifiedVotesCh(): + if voteManager.eth.IsMining() || !voteManager.signer.UsingKey(&voteMessage.VoteAddress) { + continue + } + if err := voteManager.journal.WriteVote(voteMessage); err != nil { + log.Error("Failed to write vote into journal", "err", err) + voteJournalErrorCounter.Inc(1) + continue + } + log.Debug("vote manager synced vote", "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash()) + votesManagerCounter.Inc(1) case <-voteManager.chainHeadSub.Err(): log.Debug("voteManager subscribed chainHead failed") return diff --git a/core/vote/vote_pool.go b/core/vote/vote_pool.go index 28369ed07c..000caa2415 100644 --- a/core/vote/vote_pool.go +++ b/core/vote/vote_pool.go @@ -64,6 +64,9 @@ type VotePool struct { votesCh chan *types.VoteEnvelope + // used for backup validators to sync votes from corresponding mining validator + basicVerifiedVotesCh chan *types.VoteEnvelope + engine consensus.PoSA } @@ -71,16 +74,17 @@ type votesPriorityQueue []*types.VoteData func NewVotePool(chainconfig *params.ChainConfig, chain *core.BlockChain, engine consensus.PoSA) *VotePool { votePool := &VotePool{ - chain: chain, - chainconfig: chainconfig, - receivedVotes: mapset.NewSet(), - curVotes: make(map[common.Hash]*VoteBox), - futureVotes: make(map[common.Hash]*VoteBox), - curVotesPq: &votesPriorityQueue{}, - futureVotesPq: &votesPriorityQueue{}, - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - votesCh: make(chan *types.VoteEnvelope, voteBufferForPut), - engine: engine, + chain: chain, + chainconfig: chainconfig, + receivedVotes: mapset.NewSet(), + curVotes: make(map[common.Hash]*VoteBox), + futureVotes: make(map[common.Hash]*VoteBox), + curVotesPq: &votesPriorityQueue{}, + futureVotesPq: &votesPriorityQueue{}, + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + votesCh: make(chan *types.VoteEnvelope, voteBufferForPut), + basicVerifiedVotesCh: make(chan *types.VoteEnvelope, voteBufferForPut), + engine: engine, } // Subscribe events from blockchain and start the main event loop. @@ -151,6 +155,8 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool { return false } + pool.basicVerifiedVotesCh <- vote + if !isFutureVote { // Verify if the vote comes from valid validators based on voteAddress (BLSPublicKey), only verify curVotes here, will verify futureVotes in transfer process. if pool.engine.VerifyVote(pool.chain, vote) != nil { @@ -166,6 +172,9 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool { return true } +func (pool *VotePool) BasicVerifiedVotesCh() <-chan *types.VoteEnvelope { + return pool.basicVerifiedVotesCh +} func (pool *VotePool) SubscribeNewVoteEvent(ch chan<- core.NewVoteEvent) event.Subscription { return pool.scope.Track(pool.votesFeed.Subscribe(ch)) diff --git a/core/vote/vote_signer.go b/core/vote/vote_signer.go index e6c8010b0b..03d89d53c7 100644 --- a/core/vote/vote_signer.go +++ b/core/vote/vote_signer.go @@ -104,3 +104,7 @@ func (signer *VoteSigner) SignVote(vote *types.VoteEnvelope) error { copy(vote.Signature[:], signature.Marshal()[:]) return nil } + +func (signer *VoteSigner) UsingKey(bLSPublicKey *types.BLSPublicKey) bool { + return types.BLSPublicKey(signer.pubKey) == *bLSPublicKey +}