Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed decider concurrent map access. #4610

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type DownloadAsync interface {

// Consensus is the main struct with all states and data related to consensus process.
type Consensus struct {
Decider quorum.Decider
decider quorum.Decider
// FBFTLog stores the pbft messages and blocks during FBFT process
fBFTLog *FBFTLog
// phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc
Expand Down Expand Up @@ -200,7 +200,9 @@ func (consensus *Consensus) BlocksNotSynchronized(reason string) {

// VdfSeedSize returns the number of VRFs for VDF computation
func (consensus *Consensus) VdfSeedSize() int {
return int(consensus.Decider.ParticipantsCount()) * 2 / 3
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return int(consensus.decider.ParticipantsCount()) * 2 / 3
}

// GetPublicKeys returns the public keys
Expand Down Expand Up @@ -275,7 +277,7 @@ func New(
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: State{mode: Normal},
Decider: Decider,
decider: Decider,
registry: registry,
MinPeers: minPeers,
AggregateSig: aggregateSig,
Expand Down Expand Up @@ -322,6 +324,10 @@ func (consensus *Consensus) Registry() *registry.Registry {
return consensus.registry
}

func (consensus *Consensus) Decider() quorum.Decider {
return quorum.NewThreadSafeDecider(consensus.decider, consensus.mutex)
}

// InitConsensusWithValidators initialize shard state
// from latest epoch and update committee pub
// keys for consensus
Expand Down
26 changes: 13 additions & 13 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
}

func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 {
consensus.Decider.UpdateParticipants(pubKeys, allowlist)
consensus.decider.UpdateParticipants(pubKeys, allowlist)
consensus.getLogger().Info().Msg("My Committee updated")
for i := range pubKeys {
consensus.getLogger().Info().
Expand All @@ -91,7 +91,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
Msg("Member")
}

allKeys := consensus.Decider.Participants()
allKeys := consensus.decider.Participants()
if len(allKeys) != 0 {
consensus.LeaderPubKey = &allKeys[0]
consensus.getLogger().Info().
Expand All @@ -115,7 +115,7 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
if !consensus.isViewChangingMode() {
consensus.resetViewChangeState()
}
return consensus.Decider.ParticipantsCount()
return consensus.decider.ParticipantsCount()
}

// Sign on the hash of the message
Expand Down Expand Up @@ -144,7 +144,7 @@ func (consensus *Consensus) updateBitmaps() {
consensus.getLogger().Debug().
Str("MessageType", consensus.phase.String()).
Msg("[UpdateBitmaps] Updating consensus bitmaps")
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
prepareBitmap := bls_cosi.NewMask(members)
commitBitmap := bls_cosi.NewMask(members)
multiSigBitmap := bls_cosi.NewMask(members)
Expand All @@ -160,7 +160,7 @@ func (consensus *Consensus) resetState() {

consensus.blockHash = [32]byte{}
consensus.block = []byte{}
consensus.Decider.ResetPrepareAndCommitVotes()
consensus.decider.ResetPrepareAndCommitVotes()
if consensus.prepareBitmap != nil {
consensus.prepareBitmap.Clear()
}
Expand All @@ -179,7 +179,7 @@ func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKe
}

func (consensus *Consensus) isValidatorInCommittee(pubKey bls.SerializedPublicKey) bool {
return consensus.Decider.IndexOf(pubKey) != -1
return consensus.decider.IndexOf(pubKey) != -1
}

// SetMode sets the mode of consensus
Expand Down Expand Up @@ -271,7 +271,7 @@ func (consensus *Consensus) setBlockNum(blockNum uint64) {
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) {
consensus.mutex.RLock()
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
consensus.mutex.RUnlock()
return consensus.readSignatureBitmapPayload(recvPayload, offset, members)
}
Expand Down Expand Up @@ -334,12 +334,12 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
isFirstTimeStaking := consensus.Blockchain().Config().IsStaking(nextEpoch) &&
curHeader.IsLastBlockInEpoch() && !consensus.Blockchain().Config().IsStaking(curEpoch)
haventUpdatedDecider := consensus.Blockchain().Config().IsStaking(curEpoch) &&
consensus.Decider.Policy() != quorum.SuperMajorityStake
consensus.decider.Policy() != quorum.SuperMajorityStake

// Only happens once, the flip-over to a new Decider policy
if isFirstTimeStaking || haventUpdatedDecider {
decider := quorum.NewDecider(quorum.SuperMajorityStake, consensus.ShardID)
consensus.Decider = decider
consensus.decider = decider
}

var committeeToSet *shard.Committee
Expand Down Expand Up @@ -412,7 +412,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
consensus.updatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist())

// Update voters in the committee
if _, err := consensus.Decider.SetVoters(
if _, err := consensus.decider.SetVoters(
committeeToSet, epochToSet,
); err != nil {
consensus.getLogger().Error().
Expand Down Expand Up @@ -582,7 +582,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
return errGetPreparedBlock
}

aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.Decider.Participants())
aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.decider.Participants())
if err != nil {
return errReadBitmapPayload
}
Expand All @@ -606,7 +606,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
continue
}

if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.decider.AddNewVote(
quorum.Commit,
[]*bls_cosi.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload),
Expand All @@ -628,7 +628,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0)
consensus.mutex.Lock()
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
pubKeys := consensus.getPublicKeys()
consensus.mutex.Unlock()

Expand Down
3 changes: 1 addition & 2 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestConsensusInitialization(t *testing.T) {
host, multiBLSPrivateKey, consensus, decider, err := GenerateConsensusForTesting()
host, multiBLSPrivateKey, consensus, _, err := GenerateConsensusForTesting()
assert.NoError(t, err)

messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
Expand All @@ -30,7 +30,6 @@ func TestConsensusInitialization(t *testing.T) {
expectedTimeouts[timeoutViewChange] = viewChangeDuration
expectedTimeouts[timeoutBootstrap] = bootstrapDuration

assert.Equal(t, decider, consensus.Decider)
assert.Equal(t, host, consensus.host)
assert.Equal(t, messageSender, consensus.msgSender)

Expand Down
12 changes: 6 additions & 6 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p
case t == msg_pb.MessageType_VIEWCHANGE:
fbftMsg, err = ParseViewChangeMessage(msg)
case t == msg_pb.MessageType_NEWVIEW:
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
fbftMsg, err = ParseNewViewMessage(msg, members)
default:
fbftMsg, err = consensus.parseFBFTMessage(msg)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p
}

func (consensus *Consensus) finalCommit() {
numCommits := consensus.Decider.SignersCount(quorum.Commit)
numCommits := consensus.decider.SignersCount(quorum.Commit)

consensus.getLogger().Info().
Int64("NumCommits", numCommits).
Expand Down Expand Up @@ -441,7 +441,7 @@ func (consensus *Consensus) BlockChannel(newBlock *types.Block) {
Int("numTxs", len(newBlock.Transactions())).
Int("numStakingTxs", len(newBlock.StakingTransactions())).
Time("startTime", startTime).
Int64("publicKeys", consensus.Decider.ParticipantsCount()).
Int64("publicKeys", consensus.decider.ParticipantsCount()).
Msg("[ConsensusMainLoop] STARTING CONSENSUS")
consensus.announce(newBlock)
})
Expand Down Expand Up @@ -741,16 +741,16 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) *bls.PublicKeyWrapper {

for i := 0; i < len(committee.Slots); i++ {
if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.Decider.NthNextValidator(committee.Slots, leader, offset)
wasFound, next = consensus.decider.NthNextValidator(committee.Slots, leader, offset)
} else {
wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset)
wasFound, next = consensus.decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset)
}
if !wasFound {
utils.Logger().Error().Msg("Failed to get next leader")
// Seems like nothing we can do here.
return nil
}
members := consensus.Decider.Participants()
members := consensus.decider.Participants()
mask := bls.NewMask(members)
skipped := 0
for i := 0; i < blocksCountAliveness; i++ {
Expand Down
4 changes: 2 additions & 2 deletions consensus/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (consensus *Consensus) construct(
)
} else {
// TODO: use a persistent bitmap to report bitmap
mask := bls.NewMask(consensus.Decider.Participants())
mask := bls.NewMask(consensus.decider.Participants())
for _, key := range priKeys {
mask.SetKey(key.Pub.Bytes, true)
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func (consensus *Consensus) construct(
func (consensus *Consensus) constructQuorumSigAndBitmap(p quorum.Phase) []byte {
buffer := bytes.Buffer{}
// 96 bytes aggregated signature
aggSig := consensus.Decider.AggregateVotes(p)
aggSig := consensus.decider.AggregateVotes(p)
buffer.Write(aggSig.Serialize())
// Bitmap
if p == quorum.Prepare {
Expand Down
4 changes: 2 additions & 2 deletions consensus/construct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ func TestConstructPreparedMessage(test *testing.T) {
validatorKey := bls.SerializedPublicKey{}
validatorKey.FromLibBLSPublicKey(validatorPubKey)
validatorKeyWrapper := bls.PublicKeyWrapper{Object: validatorPubKey, Bytes: validatorKey}
consensus.Decider.AddNewVote(
consensus.Decider().AddNewVote(
quorum.Prepare,
[]*bls.PublicKeyWrapper{&leaderKeyWrapper},
leaderPriKey.Sign(message),
common.BytesToHash(consensus.blockHash[:]),
consensus.BlockNum(),
consensus.GetCurBlockViewID(),
)
if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.Decider().AddNewVote(
quorum.Prepare,
[]*bls.PublicKeyWrapper{&validatorKeyWrapper},
validatorPriKey.Sign(message),
Expand Down
2 changes: 1 addition & 1 deletion consensus/double_sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
if consensus.couldThisBeADoubleSigner(recvMsg) {
addrSet := map[common.Address]struct{}{}
for _, pubKey2 := range recvMsg.SenderPubkeys {
if alreadyCastBallot := consensus.Decider.ReadBallot(
if alreadyCastBallot := consensus.decider.ReadBallot(
quorum.Commit, pubKey2.Bytes,
); alreadyCastBallot != nil {
for _, pubKey1 := range alreadyCastBallot.SignerPubKeys {
Expand Down
26 changes: 13 additions & 13 deletions consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (consensus *Consensus) announce(block *types.Block) {
continue
}

if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.decider.AddNewVote(
quorum.Prepare,
[]*bls.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(consensus.blockHash[:]),
Expand Down Expand Up @@ -112,7 +112,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
prepareBitmap := consensus.prepareBitmap
// proceed only when the message is not received before
for _, signer := range recvMsg.SenderPubkeys {
signed := consensus.Decider.ReadBallot(quorum.Prepare, signer.Bytes)
signed := consensus.decider.ReadBallot(quorum.Prepare, signer.Bytes)
if signed != nil {
consensus.getLogger().Debug().
Str("validatorPubKey", signer.Bytes.Hex()).
Expand All @@ -121,14 +121,14 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
}
}

if consensus.Decider.IsQuorumAchieved(quorum.Prepare) {
if consensus.decider.IsQuorumAchieved(quorum.Prepare) {
// already have enough signatures
consensus.getLogger().Debug().
Interface("validatorPubKeys", recvMsg.SenderPubkeys).
Msg("[OnPrepare] Received Additional Prepare Message")
return
}
signerCount := consensus.Decider.SignersCount(quorum.Prepare)
signerCount := consensus.decider.SignersCount(quorum.Prepare)
//// Read - End

// Check BLS signature for the multi-sig
Expand Down Expand Up @@ -161,11 +161,11 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {

consensus.getLogger().Debug().
Int64("NumReceivedSoFar", signerCount).
Int64("PublicKeys", consensus.Decider.ParticipantsCount()).
Int64("PublicKeys", consensus.decider.ParticipantsCount()).
Msg("[OnPrepare] Received New Prepare Signature")

//// Write - Start
if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.decider.AddNewVote(
quorum.Prepare, recvMsg.SenderPubkeys,
&sign, recvMsg.BlockHash,
recvMsg.BlockNum, recvMsg.ViewID,
Expand All @@ -181,7 +181,7 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
//// Write - End

//// Read - Start
if consensus.Decider.IsQuorumAchieved(quorum.Prepare) {
if consensus.decider.IsQuorumAchieved(quorum.Prepare) {
// NOTE Let it handle its own logs
if err := consensus.didReachPrepareQuorum(); err != nil {
return
Expand All @@ -199,7 +199,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
}
// proceed only when the message is not received before
for _, signer := range recvMsg.SenderPubkeys {
signed := consensus.Decider.ReadBallot(quorum.Commit, signer.Bytes)
signed := consensus.decider.ReadBallot(quorum.Commit, signer.Bytes)
if signed != nil {
consensus.getLogger().Debug().
Str("validatorPubKey", signer.Bytes.Hex()).
Expand All @@ -211,9 +211,9 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
commitBitmap := consensus.commitBitmap

// has to be called before verifying signature
quorumWasMet := consensus.Decider.IsQuorumAchieved(quorum.Commit)
quorumWasMet := consensus.decider.IsQuorumAchieved(quorum.Commit)

signerCount := consensus.Decider.SignersCount(quorum.Commit)
signerCount := consensus.decider.SignersCount(quorum.Commit)
//// Read - End

// Verify the signature on commitPayload is correct
Expand Down Expand Up @@ -267,7 +267,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
return
}
*/
if _, err := consensus.Decider.AddNewVote(
if _, err := consensus.decider.AddNewVote(
quorum.Commit, recvMsg.SenderPubkeys,
&sign, recvMsg.BlockHash,
recvMsg.BlockNum, recvMsg.ViewID,
Expand All @@ -285,15 +285,15 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
//// Read - Start
viewID := consensus.getCurBlockViewID()

if consensus.Decider.IsAllSigsCollected() {
if consensus.decider.IsAllSigsCollected() {
logger.Info().Msg("[OnCommit] 100% Enough commits received")
consensus.finalCommit()

consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED)
return
}

quorumIsMet := consensus.Decider.IsQuorumAchieved(quorum.Commit)
quorumIsMet := consensus.decider.IsQuorumAchieved(quorum.Commit)
//// Read - End

if !quorumWasMet && quorumIsMet {
Expand Down
Loading