diff --git a/consensus/consensus.go b/consensus/consensus.go index 18b53e682d..6f019b2a9b 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -46,7 +46,7 @@ type DownloadAsync interface { // Consensus is the main struct with all states and data related to consensus process. type Consensus struct { - Decider quorum.Decider + decider quorum.Decider // FBFTLog stores the pbft messages and blocks during FBFT process fBFTLog *FBFTLog // phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc @@ -200,7 +200,9 @@ func (consensus *Consensus) BlocksNotSynchronized(reason string) { // VdfSeedSize returns the number of VRFs for VDF computation func (consensus *Consensus) VdfSeedSize() int { - return int(consensus.Decider.ParticipantsCount()) * 2 / 3 + consensus.mutex.RLock() + defer consensus.mutex.RUnlock() + return int(consensus.decider.ParticipantsCount()) * 2 / 3 } // GetPublicKeys returns the public keys @@ -275,7 +277,7 @@ func New( fBFTLog: NewFBFTLog(), phase: FBFTAnnounce, current: State{mode: Normal}, - Decider: Decider, + decider: Decider, registry: registry, MinPeers: minPeers, AggregateSig: aggregateSig, @@ -322,6 +324,10 @@ func (consensus *Consensus) Registry() *registry.Registry { return consensus.registry } +func (consensus *Consensus) Decider() quorum.Decider { + return quorum.NewThreadSafeDecider(consensus.decider, consensus.mutex) +} + // InitConsensusWithValidators initialize shard state // from latest epoch and update committee pub // keys for consensus diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 40f0bc23d8..48324c4788 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -82,7 +82,7 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.Publi } func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 { - consensus.Decider.UpdateParticipants(pubKeys, allowlist) + consensus.decider.UpdateParticipants(pubKeys, allowlist) consensus.getLogger().Info().Msg("My Committee updated") for i := range pubKeys { consensus.getLogger().Info(). @@ -91,7 +91,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi Msg("Member") } - allKeys := consensus.Decider.Participants() + allKeys := consensus.decider.Participants() if len(allKeys) != 0 { consensus.LeaderPubKey = &allKeys[0] consensus.getLogger().Info(). @@ -115,7 +115,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi if !consensus.isViewChangingMode() { consensus.resetViewChangeState() } - return consensus.Decider.ParticipantsCount() + return consensus.decider.ParticipantsCount() } // Sign on the hash of the message @@ -144,7 +144,7 @@ func (consensus *Consensus) updateBitmaps() { consensus.getLogger().Debug(). Str("MessageType", consensus.phase.String()). Msg("[UpdateBitmaps] Updating consensus bitmaps") - members := consensus.Decider.Participants() + members := consensus.decider.Participants() prepareBitmap := bls_cosi.NewMask(members) commitBitmap := bls_cosi.NewMask(members) multiSigBitmap := bls_cosi.NewMask(members) @@ -160,7 +160,7 @@ func (consensus *Consensus) resetState() { consensus.blockHash = [32]byte{} consensus.block = []byte{} - consensus.Decider.ResetPrepareAndCommitVotes() + consensus.decider.ResetPrepareAndCommitVotes() if consensus.prepareBitmap != nil { consensus.prepareBitmap.Clear() } @@ -179,7 +179,7 @@ func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKe } func (consensus *Consensus) isValidatorInCommittee(pubKey bls.SerializedPublicKey) bool { - return consensus.Decider.IndexOf(pubKey) != -1 + return consensus.decider.IndexOf(pubKey) != -1 } // SetMode sets the mode of consensus @@ -271,7 +271,7 @@ func (consensus *Consensus) setBlockNum(blockNum uint64) { // ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) { consensus.mutex.RLock() - members := consensus.Decider.Participants() + members := consensus.decider.Participants() consensus.mutex.RUnlock() return consensus.readSignatureBitmapPayload(recvPayload, offset, members) } @@ -334,12 +334,12 @@ func (consensus *Consensus) updateConsensusInformation() Mode { isFirstTimeStaking := consensus.Blockchain().Config().IsStaking(nextEpoch) && curHeader.IsLastBlockInEpoch() && !consensus.Blockchain().Config().IsStaking(curEpoch) haventUpdatedDecider := consensus.Blockchain().Config().IsStaking(curEpoch) && - consensus.Decider.Policy() != quorum.SuperMajorityStake + consensus.decider.Policy() != quorum.SuperMajorityStake // Only happens once, the flip-over to a new Decider policy if isFirstTimeStaking || haventUpdatedDecider { decider := quorum.NewDecider(quorum.SuperMajorityStake, consensus.ShardID) - consensus.Decider = decider + consensus.decider = decider } var committeeToSet *shard.Committee @@ -412,7 +412,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode { consensus.updatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist()) // Update voters in the committee - if _, err := consensus.Decider.SetVoters( + if _, err := consensus.decider.SetVoters( committeeToSet, epochToSet, ); err != nil { consensus.getLogger().Error(). @@ -582,7 +582,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { return errGetPreparedBlock } - aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.Decider.Participants()) + aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.decider.Participants()) if err != nil { return errReadBitmapPayload } @@ -606,7 +606,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { continue } - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Commit, []*bls_cosi.PublicKeyWrapper{key.Pub}, key.Pri.SignHash(commitPayload), @@ -628,7 +628,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 { count := uint32(0) consensus.mutex.Lock() - members := consensus.Decider.Participants() + members := consensus.decider.Participants() pubKeys := consensus.getPublicKeys() consensus.mutex.Unlock() diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 992e725e75..2fe524fdf8 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -18,7 +18,7 @@ import ( ) func TestConsensusInitialization(t *testing.T) { - host, multiBLSPrivateKey, consensus, decider, err := GenerateConsensusForTesting() + host, multiBLSPrivateKey, consensus, _, err := GenerateConsensusForTesting() assert.NoError(t, err) messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} @@ -30,7 +30,6 @@ func TestConsensusInitialization(t *testing.T) { expectedTimeouts[timeoutViewChange] = viewChangeDuration expectedTimeouts[timeoutBootstrap] = bootstrapDuration - assert.Equal(t, decider, consensus.Decider) assert.Equal(t, host, consensus.host) assert.Equal(t, messageSender, consensus.msgSender) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 0e1c407057..f4b8c56f09 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -91,7 +91,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p case t == msg_pb.MessageType_VIEWCHANGE: fbftMsg, err = ParseViewChangeMessage(msg) case t == msg_pb.MessageType_NEWVIEW: - members := consensus.Decider.Participants() + members := consensus.decider.Participants() fbftMsg, err = ParseNewViewMessage(msg, members) default: fbftMsg, err = consensus.parseFBFTMessage(msg) @@ -138,7 +138,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p } func (consensus *Consensus) finalCommit() { - numCommits := consensus.Decider.SignersCount(quorum.Commit) + numCommits := consensus.decider.SignersCount(quorum.Commit) consensus.getLogger().Info(). Int64("NumCommits", numCommits). @@ -441,7 +441,7 @@ func (consensus *Consensus) BlockChannel(newBlock *types.Block) { Int("numTxs", len(newBlock.Transactions())). Int("numStakingTxs", len(newBlock.StakingTransactions())). Time("startTime", startTime). - Int64("publicKeys", consensus.Decider.ParticipantsCount()). + Int64("publicKeys", consensus.decider.ParticipantsCount()). Msg("[ConsensusMainLoop] STARTING CONSENSUS") consensus.announce(newBlock) }) @@ -741,16 +741,16 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper { for i := 0; i < len(committee.Slots); i++ { if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) { - wasFound, next = consensus.Decider.NthNextValidator(committee.Slots, leader, offset) + wasFound, next = consensus.decider.NthNextValidator(committee.Slots, leader, offset) } else { - wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset) + wasFound, next = consensus.decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset) } if !wasFound { utils.Logger().Error().Msg("Failed to get next leader") // Seems like nothing we can do here. return nil } - members := consensus.Decider.Participants() + members := consensus.decider.Participants() mask := bls.NewMask(members) skipped := 0 for i := 0; i < blocksCountAliveness; i++ { diff --git a/consensus/construct.go b/consensus/construct.go index 10488816c7..48291a3644 100644 --- a/consensus/construct.go +++ b/consensus/construct.go @@ -82,7 +82,7 @@ func (consensus *Consensus) construct( ) } else { // TODO: use a persistent bitmap to report bitmap - mask := bls.NewMask(consensus.Decider.Participants()) + mask := bls.NewMask(consensus.decider.Participants()) for _, key := range priKeys { mask.SetKey(key.Pub.Bytes, true) } @@ -161,7 +161,7 @@ func (consensus *Consensus) construct( func (consensus *Consensus) constructQuorumSigAndBitmap(p quorum.Phase) []byte { buffer := bytes.Buffer{} // 96 bytes aggregated signature - aggSig := consensus.Decider.AggregateVotes(p) + aggSig := consensus.decider.AggregateVotes(p) buffer.Write(aggSig.Serialize()) // Bitmap if p == quorum.Prepare { diff --git a/consensus/construct_test.go b/consensus/construct_test.go index 7188ebea68..c836e78224 100644 --- a/consensus/construct_test.go +++ b/consensus/construct_test.go @@ -81,7 +81,7 @@ func TestConstructPreparedMessage(test *testing.T) { validatorKey := bls.SerializedPublicKey{} validatorKey.FromLibBLSPublicKey(validatorPubKey) validatorKeyWrapper := bls.PublicKeyWrapper{Object: validatorPubKey, Bytes: validatorKey} - consensus.Decider.AddNewVote( + consensus.Decider().AddNewVote( quorum.Prepare, []*bls.PublicKeyWrapper{&leaderKeyWrapper}, leaderPriKey.Sign(message), @@ -89,7 +89,7 @@ func TestConstructPreparedMessage(test *testing.T) { consensus.BlockNum(), consensus.GetCurBlockViewID(), ) - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.Decider().AddNewVote( quorum.Prepare, []*bls.PublicKeyWrapper{&validatorKeyWrapper}, validatorPriKey.Sign(message), diff --git a/consensus/double_sign.go b/consensus/double_sign.go index 3a8d559fd6..144c67bff7 100644 --- a/consensus/double_sign.go +++ b/consensus/double_sign.go @@ -17,7 +17,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool { if consensus.couldThisBeADoubleSigner(recvMsg) { addrSet := map[common.Address]struct{}{} for _, pubKey2 := range recvMsg.SenderPubkeys { - if alreadyCastBallot := consensus.Decider.ReadBallot( + if alreadyCastBallot := consensus.decider.ReadBallot( quorum.Commit, pubKey2.Bytes, ); alreadyCastBallot != nil { for _, pubKey1 := range alreadyCastBallot.SignerPubKeys { diff --git a/consensus/leader.go b/consensus/leader.go index 0bd934cb7f..747be1eb70 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -62,7 +62,7 @@ func (consensus *Consensus) announce(block *types.Block) { continue } - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Prepare, []*bls.PublicKeyWrapper{key.Pub}, key.Pri.SignHash(consensus.blockHash[:]), @@ -112,7 +112,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { prepareBitmap := consensus.prepareBitmap // proceed only when the message is not received before for _, signer := range recvMsg.SenderPubkeys { - signed := consensus.Decider.ReadBallot(quorum.Prepare, signer.Bytes) + signed := consensus.decider.ReadBallot(quorum.Prepare, signer.Bytes) if signed != nil { consensus.getLogger().Debug(). Str("validatorPubKey", signer.Bytes.Hex()). @@ -121,14 +121,14 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { } } - if consensus.Decider.IsQuorumAchieved(quorum.Prepare) { + if consensus.decider.IsQuorumAchieved(quorum.Prepare) { // already have enough signatures consensus.getLogger().Debug(). Interface("validatorPubKeys", recvMsg.SenderPubkeys). Msg("[OnPrepare] Received Additional Prepare Message") return } - signerCount := consensus.Decider.SignersCount(quorum.Prepare) + signerCount := consensus.decider.SignersCount(quorum.Prepare) //// Read - End // Check BLS signature for the multi-sig @@ -161,11 +161,11 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { consensus.getLogger().Debug(). Int64("NumReceivedSoFar", signerCount). - Int64("PublicKeys", consensus.Decider.ParticipantsCount()). + Int64("PublicKeys", consensus.decider.ParticipantsCount()). Msg("[OnPrepare] Received New Prepare Signature") //// Write - Start - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Prepare, recvMsg.SenderPubkeys, &sign, recvMsg.BlockHash, recvMsg.BlockNum, recvMsg.ViewID, @@ -181,7 +181,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { //// Write - End //// Read - Start - if consensus.Decider.IsQuorumAchieved(quorum.Prepare) { + if consensus.decider.IsQuorumAchieved(quorum.Prepare) { // NOTE Let it handle its own logs if err := consensus.didReachPrepareQuorum(); err != nil { return @@ -199,7 +199,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { } // proceed only when the message is not received before for _, signer := range recvMsg.SenderPubkeys { - signed := consensus.Decider.ReadBallot(quorum.Commit, signer.Bytes) + signed := consensus.decider.ReadBallot(quorum.Commit, signer.Bytes) if signed != nil { consensus.getLogger().Debug(). Str("validatorPubKey", signer.Bytes.Hex()). @@ -211,9 +211,9 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { commitBitmap := consensus.commitBitmap // has to be called before verifying signature - quorumWasMet := consensus.Decider.IsQuorumAchieved(quorum.Commit) + quorumWasMet := consensus.decider.IsQuorumAchieved(quorum.Commit) - signerCount := consensus.Decider.SignersCount(quorum.Commit) + signerCount := consensus.decider.SignersCount(quorum.Commit) //// Read - End // Verify the signature on commitPayload is correct @@ -267,7 +267,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { return } */ - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Commit, recvMsg.SenderPubkeys, &sign, recvMsg.BlockHash, recvMsg.BlockNum, recvMsg.ViewID, @@ -285,7 +285,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { //// Read - Start viewID := consensus.getCurBlockViewID() - if consensus.Decider.IsAllSigsCollected() { + if consensus.decider.IsAllSigsCollected() { logger.Info().Msg("[OnCommit] 100% Enough commits received") consensus.finalCommit() @@ -293,7 +293,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { return } - quorumIsMet := consensus.Decider.IsQuorumAchieved(quorum.Commit) + quorumIsMet := consensus.decider.IsQuorumAchieved(quorum.Commit) //// Read - End if !quorumWasMet && quorumIsMet { diff --git a/consensus/quorum/thread_safe_decider.go b/consensus/quorum/thread_safe_decider.go new file mode 100644 index 0000000000..9999325f67 --- /dev/null +++ b/consensus/quorum/thread_safe_decider.go @@ -0,0 +1,179 @@ +package quorum + +import ( + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + bls_core "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/consensus/votepower" + "github.com/harmony-one/harmony/crypto/bls" + shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" + "github.com/harmony-one/harmony/multibls" + "github.com/harmony-one/harmony/numeric" + "github.com/harmony-one/harmony/shard" +) + +var _ Decider = threadSafeDeciderImpl{} + +type threadSafeDeciderImpl struct { + mu *sync.RWMutex + decider Decider +} + +func NewThreadSafeDecider(decider Decider, mu *sync.RWMutex) Decider { + return threadSafeDeciderImpl{ + mu: mu, + decider: decider, + } +} + +func (a threadSafeDeciderImpl) String() string { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.String() +} + +func (a threadSafeDeciderImpl) Participants() multibls.PublicKeys { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.Participants() +} + +func (a threadSafeDeciderImpl) IndexOf(key bls.SerializedPublicKey) int { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.IndexOf(key) +} + +func (a threadSafeDeciderImpl) ParticipantsCount() int64 { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.ParticipantsCount() +} + +func (a threadSafeDeciderImpl) NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.NthNextValidator(slotList, pubKey, next) +} + +func (a threadSafeDeciderImpl) NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.NthNextHmy(instance, pubkey, next) +} + +func (a threadSafeDeciderImpl) NthNextHmyExt(instance shardingconfig.Instance, wrapper *bls.PublicKeyWrapper, i int) (bool, *bls.PublicKeyWrapper) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.NthNextHmyExt(instance, wrapper, i) +} + +func (a threadSafeDeciderImpl) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.FirstParticipant(instance) +} + +func (a threadSafeDeciderImpl) UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper) { + a.mu.Lock() + defer a.mu.Unlock() + a.decider.UpdateParticipants(pubKeys, allowlist) +} + +func (a threadSafeDeciderImpl) submitVote(p Phase, pubkeys []bls.SerializedPublicKey, sig *bls_core.Sign, headerHash common.Hash, height, viewID uint64) (*votepower.Ballot, error) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.submitVote(p, pubkeys, sig, headerHash, height, viewID) +} + +func (a threadSafeDeciderImpl) SignersCount(phase Phase) int64 { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.SignersCount(phase) +} + +func (a threadSafeDeciderImpl) reset(phases []Phase) { + a.mu.Lock() + defer a.mu.Unlock() + a.decider.reset(phases) +} + +func (a threadSafeDeciderImpl) ReadBallot(p Phase, pubkey bls.SerializedPublicKey) *votepower.Ballot { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.ReadBallot(p, pubkey) +} + +func (a threadSafeDeciderImpl) TwoThirdsSignersCount() int64 { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.TwoThirdsSignersCount() +} + +func (a threadSafeDeciderImpl) AggregateVotes(p Phase) *bls_core.Sign { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.AggregateVotes(p) +} + +func (a threadSafeDeciderImpl) SetVoters(subCommittee *shard.Committee, epoch *big.Int) (*TallyResult, error) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.SetVoters(subCommittee, epoch) +} + +func (a threadSafeDeciderImpl) Policy() Policy { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.Policy() +} + +func (a threadSafeDeciderImpl) AddNewVote(p Phase, pubkeys []*bls.PublicKeyWrapper, sig *bls_core.Sign, headerHash common.Hash, height, viewID uint64) (*votepower.Ballot, error) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.AddNewVote(p, pubkeys, sig, headerHash, height, viewID) +} + +func (a threadSafeDeciderImpl) IsQuorumAchievedByMask(mask *bls.Mask) bool { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.IsQuorumAchievedByMask(mask) +} + +func (a threadSafeDeciderImpl) QuorumThreshold() numeric.Dec { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.QuorumThreshold() +} + +func (a threadSafeDeciderImpl) IsAllSigsCollected() bool { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.IsAllSigsCollected() +} + +func (a threadSafeDeciderImpl) ResetPrepareAndCommitVotes() { + a.mu.Lock() + defer a.mu.Unlock() + a.decider.ResetPrepareAndCommitVotes() +} + +func (a threadSafeDeciderImpl) ResetViewChangeVotes() { + a.mu.Lock() + defer a.mu.Unlock() + a.decider.ResetViewChangeVotes() +} + +func (a threadSafeDeciderImpl) CurrentTotalPower(p Phase) (*numeric.Dec, error) { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.CurrentTotalPower(p) +} + +func (a threadSafeDeciderImpl) IsQuorumAchieved(p Phase) bool { + a.mu.Lock() + defer a.mu.Unlock() + return a.decider.IsQuorumAchieved(p) +} diff --git a/consensus/threshold.go b/consensus/threshold.go index e611eaedcd..339f6d2a7e 100644 --- a/consensus/threshold.go +++ b/consensus/threshold.go @@ -57,7 +57,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error { continue } - if _, err := consensus.Decider.AddNewVote( + if _, err := consensus.decider.AddNewVote( quorum.Commit, []*bls.PublicKeyWrapper{key.Pub}, key.Pri.SignHash(commitPayload), diff --git a/consensus/validator.go b/consensus/validator.go index 891fe0c035..fa5cdac921 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -199,12 +199,12 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { // check validity of prepared signature blockHash := recvMsg.BlockHash - aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.Decider.Participants()) + aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.decider.Participants()) if err != nil { consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!") return } - if !consensus.Decider.IsQuorumAchievedByMask(mask) { + if !consensus.decider.IsQuorumAchievedByMask(mask) { consensus.getLogger().Warn().Msgf("[OnPrepared] Quorum Not achieved.") return } @@ -335,7 +335,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { return } - aggSig, mask, err := chain.DecodeSigBitmap(sigBytes, bitmap, consensus.Decider.Participants()) + aggSig, mask, err := chain.DecodeSigBitmap(sigBytes, bitmap, consensus.decider.Participants()) if err != nil { consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed") return diff --git a/consensus/view_change.go b/consensus/view_change.go index d03ae5a13a..1171b073e0 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -187,7 +187,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com // it can still sync with other validators. if curHeader.IsLastBlockInEpoch() { consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch") - lastLeaderPubKey = consensus.Decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch)) + lastLeaderPubKey = consensus.decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch)) } } } @@ -204,18 +204,18 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com var next *bls.PublicKeyWrapper if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) { if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) { - wasFound, next = consensus.Decider.NthNextValidator( + wasFound, next = consensus.decider.NthNextValidator( committee.Slots, lastLeaderPubKey, gap) } else { - wasFound, next = consensus.Decider.NthNextHmy( + wasFound, next = consensus.decider.NthNextHmy( shard.Schedule.InstanceForEpoch(epoch), lastLeaderPubKey, gap) } } else { - wasFound, next = consensus.Decider.NthNextHmy( + wasFound, next = consensus.decider.NthNextHmy( shard.Schedule.InstanceForEpoch(epoch), lastLeaderPubKey, gap) @@ -281,7 +281,7 @@ func (consensus *Consensus) startViewChange() { defer consensus.consensusTimeout[timeoutViewChange].Start() // update the dictionary key if the viewID is first time received - members := consensus.Decider.Participants() + members := consensus.decider.Participants() consensus.vc.AddViewIDKeyIfNotExist(nextViewID, members) // init my own payload @@ -386,10 +386,10 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { return } - if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) { + if consensus.decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) { consensus.getLogger().Info(). - Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)). - Int64("need", consensus.Decider.TwoThirdsSignersCount()). + Int64("have", consensus.decider.SignersCount(quorum.ViewChange)). + Int64("need", consensus.decider.TwoThirdsSignersCount()). Interface("SenderPubkeys", recvMsg.SenderPubkeys). Str("newLeaderKey", newLeaderKey.Bytes.Hex()). Msg("[onViewChange] Received Enough View Change Messages") @@ -404,7 +404,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { senderKey := recvMsg.SenderPubkeys[0] // update the dictionary key if the viewID is first time received - members := consensus.Decider.Participants() + members := consensus.decider.Participants() consensus.vc.AddViewIDKeyIfNotExist(recvMsg.ViewID, members) // do it once only per viewID/Leader @@ -420,7 +420,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { return } - err = consensus.vc.ProcessViewChangeMsg(consensus.fBFTLog, consensus.Decider, recvMsg, consensus.verifyBlock) + err = consensus.vc.ProcessViewChangeMsg(consensus.fBFTLog, consensus.decider, recvMsg, consensus.verifyBlock) if err != nil { consensus.getLogger().Error().Err(err). Uint64("viewID", recvMsg.ViewID). @@ -431,7 +431,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { } // received enough view change messages, change state to normal consensus - if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.isViewChangingMode() { + if consensus.decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.isViewChangingMode() { // no previous prepared message, go straight to normal mode // and start proposing new block if consensus.vc.IsM1PayloadEmpty() { @@ -495,7 +495,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { } m3Mask := recvMsg.M3Bitmap - if !consensus.Decider.IsQuorumAchievedByMask(m3Mask) { + if !consensus.decider.IsQuorumAchievedByMask(m3Mask) { consensus.getLogger().Warn(). Msgf("[onNewView] Quorum Not achieved") return @@ -507,7 +507,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) { // m1 is not empty, check it's valid blockHash := recvMsg.Payload[:32] - aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.Decider.Participants()) + aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.decider.Participants()) if err != nil { consensus.getLogger().Error().Err(err). Msg("[onNewView] ReadSignatureBitmapPayload Failed") @@ -584,5 +584,5 @@ func (consensus *Consensus) resetViewChangeState() { Msg("[ResetViewChangeState] Resetting view change state") consensus.current.SetMode(Normal) consensus.vc.Reset() - consensus.Decider.ResetViewChangeVotes() + consensus.decider.ResetViewChangeVotes() } diff --git a/consensus/view_change_test.go b/consensus/view_change_test.go index 96d8fbc865..bbc6999445 100644 --- a/consensus/view_change_test.go +++ b/consensus/view_change_test.go @@ -94,7 +94,7 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) { _, _, consensus, _, err := GenerateConsensusForTesting() assert.NoError(t, err) - assert.Equal(t, int64(0), consensus.Decider.ParticipantsCount()) + assert.Equal(t, int64(0), consensus.Decider().ParticipantsCount()) blsKeys := []*bls_core.PublicKey{} wrappedBLSKeys := []bls.PublicKeyWrapper{} @@ -111,8 +111,8 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) { wrappedBLSKeys = append(wrappedBLSKeys, wrapped) } - consensus.Decider.UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{}) - assert.Equal(t, keyCount, consensus.Decider.ParticipantsCount()) + consensus.Decider().UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{}) + assert.Equal(t, keyCount, consensus.Decider().ParticipantsCount()) consensus.LeaderPubKey = &wrappedBLSKeys[0] nextKey := consensus.getNextLeaderKey(uint64(1), nil) diff --git a/node/api.go b/node/api.go index ef76079f1e..e3862f510c 100644 --- a/node/api.go +++ b/node/api.go @@ -177,7 +177,7 @@ func (node *Node) GetConfig() rpc_common.Config { // GetLastSigningPower get last signed power func (node *Node) GetLastSigningPower() (float64, error) { - power, err := node.Consensus.Decider.CurrentTotalPower(quorum.Commit) + power, err := node.Consensus.Decider().CurrentTotalPower(quorum.Commit) if err != nil { return 0, err } diff --git a/node/node.go b/node/node.go index df766e52ee..d815f86ece 100644 --- a/node/node.go +++ b/node/node.go @@ -655,7 +655,7 @@ func validateShardBoundMessage(consensus *consensus.Consensus, peer libp2p_peer. return nil, nil, true, errors.WithStack(shard.ErrValidNotInCommittee) } } else { - count := consensus.Decider.ParticipantsCount() + count := consensus.Decider().ParticipantsCount() if (count+7)>>3 != int64(len(senderBitmap)) { nodeConsensusMessageCounterVec.With(prometheus.Labels{"type": "invalid_participant_count"}).Inc() return nil, nil, true, errors.WithStack(errWrongSizeOfBitmap) diff --git a/node/node_explorer.go b/node/node_explorer.go index ce1b0a2445..1e4a4010a0 100644 --- a/node/node_explorer.go +++ b/node/node_explorer.go @@ -53,7 +53,7 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag return err } - if !node.Consensus.Decider.IsQuorumAchievedByMask(mask) { + if !node.Consensus.Decider().IsQuorumAchievedByMask(mask) { utils.Logger().Error().Msg("[Explorer] not have enough signature power") return nil }