Skip to content

Commit

Permalink
Lock free consensus methods (#4739)
Browse files Browse the repository at this point in the history
* Lock free consensus methods.
  • Loading branch information
Frozen authored Sep 3, 2024
1 parent 016d979 commit 4718a1c
Show file tree
Hide file tree
Showing 19 changed files with 304 additions and 298 deletions.
1 change: 0 additions & 1 deletion cmd/harmony/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,6 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
Msg("Init Blockchain")

currentNode.Consensus.Registry().SetNodeConfig(currentNode.NodeConfig)
currentConsensus.PostConsensusJob = currentNode.PostConsensusProcessing
// update consensus information based on the blockchain
currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation())
currentConsensus.NextBlockDue = time.Now()
Expand Down
4 changes: 2 additions & 2 deletions consensus/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool {
Str("logMsgSenderKey", logMsgs[0].SenderPubkeys[0].Bytes.Hex()).
Str("logMsgBlockHash", logMsgs[0].BlockHash.Hex()).
Str("recvMsg", recvMsg.String()).
Str("LeaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Str("LeaderKey", consensus.getLeaderPubKey().Bytes.Hex()).
Msg("[OnAnnounce] Leader is malicious")
if consensus.isViewChangingMode() {
consensus.getLogger().Debug().Msg(
Expand All @@ -96,7 +96,7 @@ func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool {
}
}
consensus.getLogger().Debug().
Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Str("leaderKey", consensus.getLeaderPubKey().Bytes.Hex()).
Msg("[OnAnnounce] Announce message received again")
}
return consensus.isRightBlockNumCheck(recvMsg)
Expand Down
16 changes: 6 additions & 10 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/harmony-one/abool"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
Expand Down Expand Up @@ -84,7 +85,7 @@ type Consensus struct {
// private/public keys of current node
priKey multibls.PrivateKeys
// the publickey of leader
LeaderPubKey *bls.PublicKeyWrapper
leaderPubKey unsafe.Pointer //*bls.PublicKeyWrapper
// blockNum: the next blockNumber that FBFT is going to agree on,
// should be equal to the blockNumber of next block
blockNum uint64
Expand All @@ -104,9 +105,6 @@ type Consensus struct {
readySignal chan Proposal
// Channel to send full commit signatures to finish new block proposal
commitSigChannel chan []byte
// The post-consensus job func passed from Node object
// Called when consensus on a new block is done
PostConsensusJob func(*types.Block) error
// verified block to state sync broadcast
VerifiedNewBlock chan *types.Block
// Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf
Expand Down Expand Up @@ -230,17 +228,15 @@ func (consensus *Consensus) GetLeaderPubKey() *bls_cosi.PublicKeyWrapper {
}

func (consensus *Consensus) getLeaderPubKey() *bls_cosi.PublicKeyWrapper {
return consensus.LeaderPubKey
return (*bls_cosi.PublicKeyWrapper)(atomic.LoadPointer(&consensus.leaderPubKey))
}

func (consensus *Consensus) SetLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.setLeaderPubKey(pub)
}

func (consensus *Consensus) setLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
consensus.LeaderPubKey = pub
atomic.StorePointer(&consensus.leaderPubKey, unsafe.Pointer(pub))
}

func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys {
Expand All @@ -259,7 +255,7 @@ func (consensus *Consensus) getLeaderPrivateKey(leaderKey *bls_core.PublicKey) (

// getConsensusLeaderPrivateKey returns consensus leader private key if node is the leader
func (consensus *Consensus) getConsensusLeaderPrivateKey() (*bls.PrivateKeyWrapper, error) {
return consensus.getLeaderPrivateKey(consensus.LeaderPubKey.Object)
return consensus.getLeaderPrivateKey(consensus.getLeaderPubKey().Object)
}

func (consensus *Consensus) IsBackup() bool {
Expand Down Expand Up @@ -287,7 +283,7 @@ func New(
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: State{mode: Normal},
current: NewState(Normal),
decider: Decider,
registry: registry,
MinPeers: minPeers,
Expand Down
10 changes: 2 additions & 8 deletions consensus/consensus_msg_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa
sender.Retry(&msgRetry)
}()
}
// MessageSender lays inside consensus, but internally calls consensus public api.
// Tt would be deadlock if run in current thread.
go sender.host.SendMessageToGroups(groups, p2pMsg)
return nil
return sender.host.SendMessageToGroups(groups, p2pMsg)
}

// DelayedSendWithRetry is similar to SendWithRetry but without the initial message sending but only retries.
Expand All @@ -89,10 +86,7 @@ func (sender *MessageSender) DelayedSendWithRetry(blockNum uint64, msgType msg_p

// SendWithoutRetry sends message without retry logic.
func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error {
// MessageSender lays inside consensus, but internally calls consensus public api.
// It would be deadlock if run in current thread.
go sender.host.SendMessageToGroups(groups, p2pMsg)
return nil
return sender.host.SendMessageToGroups(groups, p2pMsg)
}

// Retry will retry the consensus message for <RetryTimes> times.
Expand Down
33 changes: 14 additions & 19 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi

allKeys := consensus.decider.Participants()
if len(allKeys) != 0 {
consensus.LeaderPubKey = &allKeys[0]
consensus.setLeaderPubKey(&allKeys[0])
consensus.getLogger().Info().
Str("info", consensus.LeaderPubKey.Bytes.Hex()).Msg("Setting leader as first validator, because provided new keys")
Str("info", consensus.getLeaderPubKey().Bytes.Hex()).Msg("Setting leader as first validator, because provided new keys")
} else {
consensus.getLogger().Error().
Msg("[UpdatePublicKeys] Participants is empty")
Expand Down Expand Up @@ -172,8 +172,6 @@ func (consensus *Consensus) resetState() {

// IsValidatorInCommittee returns whether the given validator BLS address is part of my committee
func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKey) bool {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.isValidatorInCommittee(pubKey)
}

Expand Down Expand Up @@ -207,9 +205,8 @@ func (consensus *Consensus) SetIsBackup(isBackup bool) {
}

// Mode returns the mode of consensus
// Method is thread safe.
func (consensus *Consensus) Mode() Mode {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.mode()
}

Expand Down Expand Up @@ -239,11 +236,11 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {
if !msg.HasSingleSender() {
return errors.New("Leader message can not have multiple sender keys")
}
consensus.LeaderPubKey = msg.SenderPubkeys[0]
consensus.setLeaderPubKey(msg.SenderPubkeys[0])
consensus.IgnoreViewIDCheck.UnSet()
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().
Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Str("leaderKey", consensus.getLeaderPubKey().Bytes.Hex()).
Msg("[checkViewID] Start consensus timer")
return nil
} else if msg.ViewID > consensus.getCurBlockViewID() {
Expand Down Expand Up @@ -399,7 +396,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
}

// update public keys in the committee
oldLeader := consensus.LeaderPubKey
oldLeader := consensus.getLeaderPubKey()
pubKeys, _ := committeeToSet.BLSPublicKeys()

consensus.getLogger().Info().
Expand Down Expand Up @@ -436,7 +433,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
consensus.getLogger().Info().
Str("leaderPubKey", leaderPubKey.Bytes.Hex()).
Msgf("[UpdateConsensusInformation] Most Recent LeaderPubKey Updated Based on BlockChain, blocknum: %d", curHeader.NumberU64())
consensus.LeaderPubKey = leaderPubKey
consensus.setLeaderPubKey(leaderPubKey)
}
}

Expand All @@ -453,8 +450,8 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
}

// If the leader changed and I myself become the leader
if (oldLeader != nil && consensus.LeaderPubKey != nil &&
!consensus.LeaderPubKey.Object.IsEqual(oldLeader.Object)) && consensus.isLeader() {
if (oldLeader != nil && consensus.getLeaderPubKey() != nil &&
!consensus.getLeaderPubKey().Object.IsEqual(oldLeader.Object)) && consensus.isLeader() {
go func() {
consensus.GetLogger().Info().
Str("myKey", myPubKeys.SerializeToHexStr()).
Expand All @@ -474,20 +471,19 @@ func (consensus *Consensus) updateConsensusInformation() Mode {

// IsLeader check if the node is a leader or not by comparing the public key of
// the node with the leader public key
// Method is thread safe
func (consensus *Consensus) IsLeader() bool {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()

return consensus.isLeader()
}

// isLeader check if the node is a leader or not by comparing the public key of
// the node with the leader public key. This function assume it runs under lock.
func (consensus *Consensus) isLeader() bool {
if consensus.LeaderPubKey == nil {
pub := consensus.getLeaderPubKey()
if pub == nil {
return false
}
obj := consensus.LeaderPubKey.Object
obj := pub.Object
for _, key := range consensus.priKey {
if key.Pub.Object.IsEqual(obj) {
return true
Expand Down Expand Up @@ -624,12 +620,11 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
}

// NumSignaturesIncludedInBlock returns the number of signatures included in the block
// Method is thread safe.
func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0)
consensus.mutex.Lock()
members := consensus.decider.Participants()
pubKeys := consensus.getPublicKeys()
consensus.mutex.Unlock()

// TODO(audit): do not reconstruct the Mask
mask := bls.NewMask(members)
Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestConsensusInitialization(t *testing.T) {
assert.NoError(t, err)

messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
state := State{mode: Normal}
state := NewState(Normal)

timeouts := createTimeout()
expectedTimeouts := make(map[TimeoutType]time.Duration)
Expand Down
11 changes: 4 additions & 7 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ const (
CommitSigSenderTimeout = 10 * time.Second
)

// IsViewChangingMode return true if curernt mode is viewchanging
// IsViewChangingMode return true if current mode is viewchanging.
// Method is thread safe.
func (consensus *Consensus) IsViewChangingMode() bool {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.isViewChangingMode()
}

Expand All @@ -68,7 +67,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p
// Do easier check before signature check
if msg.Type == msg_pb.MessageType_ANNOUNCE || msg.Type == msg_pb.MessageType_PREPARED || msg.Type == msg_pb.MessageType_COMMITTED {
// Only validator needs to check whether the message is from the correct leader
if !bytes.Equal(senderKey[:], consensus.LeaderPubKey.Bytes[:]) &&
if !bytes.Equal(senderKey[:], consensus.getLeaderPubKey().Bytes[:]) &&
consensus.current.Mode() == Normal && !consensus.IgnoreViewIDCheck.IsSet() {
return errSenderPubKeyNotLeader
}
Expand Down Expand Up @@ -666,9 +665,7 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess
}

consensus.FinishFinalityCount()
go func() {
consensus.PostConsensusJob(blk)
}()
consensus.PostConsensusProcessing(blk)
consensus.setupForNewConsensus(blk, committedMsg)
utils.Logger().Info().Uint64("blockNum", blk.NumberU64()).
Str("hash", blk.Header().Hash().Hex()).
Expand Down
12 changes: 4 additions & 8 deletions consensus/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@ func (consensus *Consensus) getConsensusPhase() string {

// GetConsensusMode returns the current mode of the consensus
func (consensus *Consensus) GetConsensusMode() string {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.current.mode.String()
return consensus.current.Mode().String()
}

// GetCurBlockViewID returns the current view ID of the consensus
// Method is thread safe.
func (consensus *Consensus) GetCurBlockViewID() uint64 {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.getCurBlockViewID()
}

Expand All @@ -31,10 +28,9 @@ func (consensus *Consensus) getCurBlockViewID() uint64 {
return consensus.current.GetCurBlockViewID()
}

// GetViewChangingID returns the current view changing ID of the consensus
// GetViewChangingID returns the current view changing ID of the consensus.
// Method is thread safe.
func (consensus *Consensus) GetViewChangingID() uint64 {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.current.GetViewChangingID()
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/double_sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
break
}

leaderAddr, err := subComm.AddressForBLSKey(consensus.LeaderPubKey.Bytes)
leaderAddr, err := subComm.AddressForBLSKey(consensus.getLeaderPubKey().Bytes)
if err != nil {
consensus.getLogger().Err(err).Str("msg", recvMsg.String()).
Msg("could not find address for leader bls key")
Expand Down
2 changes: 1 addition & 1 deletion consensus/enums.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package consensus
import "fmt"

// Mode is the current
type Mode byte
type Mode uint32

const (
// Normal ..
Expand Down
Loading

0 comments on commit 4718a1c

Please sign in to comment.