Skip to content

Commit

Permalink
vote: backup validator sync votes from corresponding mining validator (
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanBSC authored Aug 2, 2023
1 parent 4525cff commit 522d4cd
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
29 changes: 26 additions & 3 deletions core/vote/vote_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type VoteManager struct {
chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription

// used for backup validators to sync votes from corresponding mining validator
syncVoteCh chan core.NewVoteEvent
syncVoteSub event.Subscription

pool *VotePool
signer *VoteSigner
journal *VoteJournal
Expand All @@ -46,9 +50,9 @@ func NewVoteManager(eth Backend, chainconfig *params.ChainConfig, chain *core.Bl
chain: chain,
chainconfig: chainconfig,
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),

pool: pool,
engine: engine,
syncVoteCh: make(chan core.NewVoteEvent, voteBufferForPut),
pool: pool,
engine: engine,
}

// Create voteSigner.
Expand All @@ -69,6 +73,7 @@ func NewVoteManager(eth Backend, chainconfig *params.ChainConfig, chain *core.Bl

// Subscribe to chain head event.
voteManager.chainHeadSub = voteManager.chain.SubscribeChainHeadEvent(voteManager.chainHeadCh)
voteManager.syncVoteSub = voteManager.pool.SubscribeNewVoteEvent(voteManager.syncVoteCh)

go voteManager.loop()

Expand All @@ -77,6 +82,9 @@ func NewVoteManager(eth Backend, chainconfig *params.ChainConfig, chain *core.Bl

func (voteManager *VoteManager) loop() {
log.Debug("vote manager routine loop started")
defer voteManager.chainHeadSub.Unsubscribe()
defer voteManager.syncVoteSub.Unsubscribe()

events := voteManager.eth.EventMux().Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
defer func() {
log.Debug("vote manager loop defer func occur")
Expand Down Expand Up @@ -164,6 +172,21 @@ func (voteManager *VoteManager) loop() {
voteManager.pool.PutVote(voteMessage)
votesManagerCounter.Inc(1)
}
case event := <-voteManager.syncVoteCh:
voteMessage := event.Vote
if voteManager.eth.IsMining() || !voteManager.signer.UsingKey(&voteMessage.VoteAddress) {
continue
}
if err := voteManager.journal.WriteVote(voteMessage); err != nil {
log.Error("Failed to write vote into journal", "err", err)
voteJournalErrorCounter.Inc(1)
continue
}
log.Debug("vote manager synced vote", "votedBlockNumber", voteMessage.Data.TargetNumber, "votedBlockHash", voteMessage.Data.TargetHash, "voteMessageHash", voteMessage.Hash())
votesManagerCounter.Inc(1)
case <-voteManager.syncVoteSub.Err():
log.Debug("voteManager subscribed votes failed")
return
case <-voteManager.chainHeadSub.Err():
log.Debug("voteManager subscribed chainHead failed")
return
Expand Down
2 changes: 2 additions & 0 deletions core/vote/vote_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func NewVotePool(chainconfig *params.ChainConfig, chain *core.BlockChain, engine

// loop is the vote pool's main even loop, waiting for and reacting to outside blockchain events and votes channel event.
func (pool *VotePool) loop() {
defer pool.chainHeadSub.Unsubscribe()

for {
select {
// Handle ChainHeadEvent.
Expand Down
5 changes: 5 additions & 0 deletions core/vote/vote_signer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vote

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -104,3 +105,7 @@ func (signer *VoteSigner) SignVote(vote *types.VoteEnvelope) error {
copy(vote.Signature[:], signature.Marshal()[:])
return nil
}

func (signer *VoteSigner) UsingKey(bLSPublicKey *types.BLSPublicKey) bool {
return bytes.Equal(signer.pubKey[:], bLSPublicKey[:])
}

0 comments on commit 522d4cd

Please sign in to comment.