diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 34cfd792b7..9629bee0aa 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -887,7 +887,6 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi Msg("Init Blockchain") currentNode.Consensus.Registry().SetNodeConfig(currentNode.NodeConfig) - currentConsensus.PostConsensusJob = currentNode.PostConsensusProcessing // update consensus information based on the blockchain currentConsensus.SetMode(currentConsensus.UpdateConsensusInformation()) currentConsensus.NextBlockDue = time.Now() diff --git a/consensus/checks.go b/consensus/checks.go index b4e1d12075..739c19dd6c 100644 --- a/consensus/checks.go +++ b/consensus/checks.go @@ -85,7 +85,7 @@ func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool { Str("logMsgSenderKey", logMsgs[0].SenderPubkeys[0].Bytes.Hex()). Str("logMsgBlockHash", logMsgs[0].BlockHash.Hex()). Str("recvMsg", recvMsg.String()). - Str("LeaderKey", consensus.LeaderPubKey.Bytes.Hex()). + Str("LeaderKey", consensus.getLeaderPubKey().Bytes.Hex()). Msg("[OnAnnounce] Leader is malicious") if consensus.isViewChangingMode() { consensus.getLogger().Debug().Msg( @@ -96,7 +96,7 @@ func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool { } } consensus.getLogger().Debug(). - Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()). + Str("leaderKey", consensus.getLeaderPubKey().Bytes.Hex()). Msg("[OnAnnounce] Announce message received again") } return consensus.isRightBlockNumCheck(recvMsg) diff --git a/consensus/consensus.go b/consensus/consensus.go index a281a8ae7c..1a649e5ee9 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -5,6 +5,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/harmony-one/abool" bls_core "github.com/harmony-one/bls/ffi/go/bls" @@ -84,7 +85,7 @@ type Consensus struct { // private/public keys of current node priKey multibls.PrivateKeys // the publickey of leader - LeaderPubKey *bls.PublicKeyWrapper + leaderPubKey unsafe.Pointer //*bls.PublicKeyWrapper // blockNum: the next blockNumber that FBFT is going to agree on, // should be equal to the blockNumber of next block blockNum uint64 @@ -104,9 +105,6 @@ type Consensus struct { readySignal chan Proposal // Channel to send full commit signatures to finish new block proposal commitSigChannel chan []byte - // The post-consensus job func passed from Node object - // Called when consensus on a new block is done - PostConsensusJob func(*types.Block) error // verified block to state sync broadcast VerifiedNewBlock chan *types.Block // Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf @@ -230,17 +228,15 @@ func (consensus *Consensus) GetLeaderPubKey() *bls_cosi.PublicKeyWrapper { } func (consensus *Consensus) getLeaderPubKey() *bls_cosi.PublicKeyWrapper { - return consensus.LeaderPubKey + return (*bls_cosi.PublicKeyWrapper)(atomic.LoadPointer(&consensus.leaderPubKey)) } func (consensus *Consensus) SetLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() consensus.setLeaderPubKey(pub) } func (consensus *Consensus) setLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) { - consensus.LeaderPubKey = pub + atomic.StorePointer(&consensus.leaderPubKey, unsafe.Pointer(pub)) } func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys { @@ -259,7 +255,7 @@ func (consensus *Consensus) getLeaderPrivateKey(leaderKey *bls_core.PublicKey) ( // getConsensusLeaderPrivateKey returns consensus leader private key if node is the leader func (consensus *Consensus) getConsensusLeaderPrivateKey() (*bls.PrivateKeyWrapper, error) { - return consensus.getLeaderPrivateKey(consensus.LeaderPubKey.Object) + return consensus.getLeaderPrivateKey(consensus.getLeaderPubKey().Object) } func (consensus *Consensus) IsBackup() bool { @@ -287,7 +283,7 @@ func New( ShardID: shard, fBFTLog: NewFBFTLog(), phase: FBFTAnnounce, - current: State{mode: Normal}, + current: NewState(Normal), decider: Decider, registry: registry, MinPeers: minPeers, diff --git a/consensus/consensus_msg_sender.go b/consensus/consensus_msg_sender.go index ad32b91ce3..b18d4e97ab 100644 --- a/consensus/consensus_msg_sender.go +++ b/consensus/consensus_msg_sender.go @@ -67,10 +67,7 @@ func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.Messa sender.Retry(&msgRetry) }() } - // MessageSender lays inside consensus, but internally calls consensus public api. - // Tt would be deadlock if run in current thread. - go sender.host.SendMessageToGroups(groups, p2pMsg) - return nil + return sender.host.SendMessageToGroups(groups, p2pMsg) } // DelayedSendWithRetry is similar to SendWithRetry but without the initial message sending but only retries. @@ -89,10 +86,7 @@ func (sender *MessageSender) DelayedSendWithRetry(blockNum uint64, msgType msg_p // SendWithoutRetry sends message without retry logic. func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error { - // MessageSender lays inside consensus, but internally calls consensus public api. - // It would be deadlock if run in current thread. - go sender.host.SendMessageToGroups(groups, p2pMsg) - return nil + return sender.host.SendMessageToGroups(groups, p2pMsg) } // Retry will retry the consensus message for times. diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index e92e242765..e64f3007ab 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -92,9 +92,9 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi allKeys := consensus.decider.Participants() if len(allKeys) != 0 { - consensus.LeaderPubKey = &allKeys[0] + consensus.setLeaderPubKey(&allKeys[0]) consensus.getLogger().Info(). - Str("info", consensus.LeaderPubKey.Bytes.Hex()).Msg("Setting leader as first validator, because provided new keys") + Str("info", consensus.getLeaderPubKey().Bytes.Hex()).Msg("Setting leader as first validator, because provided new keys") } else { consensus.getLogger().Error(). Msg("[UpdatePublicKeys] Participants is empty") @@ -172,8 +172,6 @@ func (consensus *Consensus) resetState() { // IsValidatorInCommittee returns whether the given validator BLS address is part of my committee func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKey) bool { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() return consensus.isValidatorInCommittee(pubKey) } @@ -207,9 +205,8 @@ func (consensus *Consensus) SetIsBackup(isBackup bool) { } // Mode returns the mode of consensus +// Method is thread safe. func (consensus *Consensus) Mode() Mode { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() return consensus.mode() } @@ -239,11 +236,11 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error { if !msg.HasSingleSender() { return errors.New("Leader message can not have multiple sender keys") } - consensus.LeaderPubKey = msg.SenderPubkeys[0] + consensus.setLeaderPubKey(msg.SenderPubkeys[0]) consensus.IgnoreViewIDCheck.UnSet() consensus.consensusTimeout[timeoutConsensus].Start() consensus.getLogger().Info(). - Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()). + Str("leaderKey", consensus.getLeaderPubKey().Bytes.Hex()). Msg("[checkViewID] Start consensus timer") return nil } else if msg.ViewID > consensus.getCurBlockViewID() { @@ -399,7 +396,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode { } // update public keys in the committee - oldLeader := consensus.LeaderPubKey + oldLeader := consensus.getLeaderPubKey() pubKeys, _ := committeeToSet.BLSPublicKeys() consensus.getLogger().Info(). @@ -436,7 +433,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode { consensus.getLogger().Info(). Str("leaderPubKey", leaderPubKey.Bytes.Hex()). Msgf("[UpdateConsensusInformation] Most Recent LeaderPubKey Updated Based on BlockChain, blocknum: %d", curHeader.NumberU64()) - consensus.LeaderPubKey = leaderPubKey + consensus.setLeaderPubKey(leaderPubKey) } } @@ -453,8 +450,8 @@ func (consensus *Consensus) updateConsensusInformation() Mode { } // If the leader changed and I myself become the leader - if (oldLeader != nil && consensus.LeaderPubKey != nil && - !consensus.LeaderPubKey.Object.IsEqual(oldLeader.Object)) && consensus.isLeader() { + if (oldLeader != nil && consensus.getLeaderPubKey() != nil && + !consensus.getLeaderPubKey().Object.IsEqual(oldLeader.Object)) && consensus.isLeader() { go func() { consensus.GetLogger().Info(). Str("myKey", myPubKeys.SerializeToHexStr()). @@ -474,20 +471,19 @@ func (consensus *Consensus) updateConsensusInformation() Mode { // IsLeader check if the node is a leader or not by comparing the public key of // the node with the leader public key +// Method is thread safe func (consensus *Consensus) IsLeader() bool { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() - return consensus.isLeader() } // isLeader check if the node is a leader or not by comparing the public key of // the node with the leader public key. This function assume it runs under lock. func (consensus *Consensus) isLeader() bool { - if consensus.LeaderPubKey == nil { + pub := consensus.getLeaderPubKey() + if pub == nil { return false } - obj := consensus.LeaderPubKey.Object + obj := pub.Object for _, key := range consensus.priKey { if key.Pub.Object.IsEqual(obj) { return true @@ -624,12 +620,11 @@ func (consensus *Consensus) selfCommit(payload []byte) error { } // NumSignaturesIncludedInBlock returns the number of signatures included in the block +// Method is thread safe. func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 { count := uint32(0) - consensus.mutex.Lock() members := consensus.decider.Participants() pubKeys := consensus.getPublicKeys() - consensus.mutex.Unlock() // TODO(audit): do not reconstruct the Mask mask := bls.NewMask(members) diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 2fe524fdf8..0f8b0c724b 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -22,7 +22,7 @@ func TestConsensusInitialization(t *testing.T) { assert.NoError(t, err) messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} - state := State{mode: Normal} + state := NewState(Normal) timeouts := createTimeout() expectedTimeouts := make(map[TimeoutType]time.Duration) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 6caed6e504..dbc8e8bf3d 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -40,10 +40,9 @@ const ( CommitSigSenderTimeout = 10 * time.Second ) -// IsViewChangingMode return true if curernt mode is viewchanging +// IsViewChangingMode return true if current mode is viewchanging. +// Method is thread safe. func (consensus *Consensus) IsViewChangingMode() bool { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() return consensus.isViewChangingMode() } @@ -68,7 +67,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p // Do easier check before signature check if msg.Type == msg_pb.MessageType_ANNOUNCE || msg.Type == msg_pb.MessageType_PREPARED || msg.Type == msg_pb.MessageType_COMMITTED { // Only validator needs to check whether the message is from the correct leader - if !bytes.Equal(senderKey[:], consensus.LeaderPubKey.Bytes[:]) && + if !bytes.Equal(senderKey[:], consensus.getLeaderPubKey().Bytes[:]) && consensus.current.Mode() == Normal && !consensus.IgnoreViewIDCheck.IsSet() { return errSenderPubKeyNotLeader } @@ -666,9 +665,7 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess } consensus.FinishFinalityCount() - go func() { - consensus.PostConsensusJob(blk) - }() + consensus.PostConsensusProcessing(blk) consensus.setupForNewConsensus(blk, committedMsg) utils.Logger().Info().Uint64("blockNum", blk.NumberU64()). Str("hash", blk.Header().Hash().Hex()). diff --git a/consensus/debug.go b/consensus/debug.go index a323e04ed6..c195305809 100644 --- a/consensus/debug.go +++ b/consensus/debug.go @@ -14,15 +14,12 @@ func (consensus *Consensus) getConsensusPhase() string { // GetConsensusMode returns the current mode of the consensus func (consensus *Consensus) GetConsensusMode() string { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() - return consensus.current.mode.String() + return consensus.current.Mode().String() } // GetCurBlockViewID returns the current view ID of the consensus +// Method is thread safe. func (consensus *Consensus) GetCurBlockViewID() uint64 { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() return consensus.getCurBlockViewID() } @@ -31,10 +28,9 @@ func (consensus *Consensus) getCurBlockViewID() uint64 { return consensus.current.GetCurBlockViewID() } -// GetViewChangingID returns the current view changing ID of the consensus +// GetViewChangingID returns the current view changing ID of the consensus. +// Method is thread safe. func (consensus *Consensus) GetViewChangingID() uint64 { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() return consensus.current.GetViewChangingID() } diff --git a/consensus/double_sign.go b/consensus/double_sign.go index 144c67bff7..6ad2efa0f8 100644 --- a/consensus/double_sign.go +++ b/consensus/double_sign.go @@ -71,7 +71,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool { break } - leaderAddr, err := subComm.AddressForBLSKey(consensus.LeaderPubKey.Bytes) + leaderAddr, err := subComm.AddressForBLSKey(consensus.getLeaderPubKey().Bytes) if err != nil { consensus.getLogger().Err(err).Str("msg", recvMsg.String()). Msg("could not find address for leader bls key") diff --git a/consensus/enums.go b/consensus/enums.go index 41eafba863..7c4b28be25 100644 --- a/consensus/enums.go +++ b/consensus/enums.go @@ -3,7 +3,7 @@ package consensus import "fmt" // Mode is the current -type Mode byte +type Mode uint32 const ( // Normal .. diff --git a/consensus/post_processing.go b/consensus/post_processing.go new file mode 100644 index 0000000000..384546f713 --- /dev/null +++ b/consensus/post_processing.go @@ -0,0 +1,232 @@ +package consensus + +import ( + "math/rand" + + proto_node "github.com/harmony-one/harmony/api/proto/node" + "github.com/harmony-one/harmony/block" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/shard" + "github.com/harmony-one/harmony/staking/availability" + "github.com/harmony-one/harmony/webhooks" +) + +// PostConsensusProcessing is called by consensus participants, after consensus is done, to: +// 1. [leader] send new block to the client +// 2. [leader] send cross shard tx receipts to destination shard +func (consensus *Consensus) PostConsensusProcessing(newBlock *types.Block) error { + if consensus.IsLeader() { + if IsRunningBeaconChain(consensus) { + // TODO: consider removing this and letting other nodes broadcast new blocks. + // But need to make sure there is at least 1 node that will do the job. + BroadcastNewBlock(consensus.host, newBlock, consensus.registry.GetNodeConfig()) + } + BroadcastCXReceipts(newBlock, consensus) + } else { + if mode := consensus.mode(); mode != Listening { + numSignatures := consensus.NumSignaturesIncludedInBlock(newBlock) + utils.Logger().Info(). + Uint64("blockNum", newBlock.NumberU64()). + Uint64("epochNum", newBlock.Epoch().Uint64()). + Uint64("ViewId", newBlock.Header().ViewID().Uint64()). + Str("blockHash", newBlock.Hash().String()). + Int("numTxns", len(newBlock.Transactions())). + Int("numStakingTxns", len(newBlock.StakingTransactions())). + Uint32("numSignatures", numSignatures). + Str("mode", mode.String()). + Msg("BINGO !!! Reached Consensus") + if consensus.mode() == Syncing { + mode = consensus.updateConsensusInformation() + utils.Logger().Info().Msgf("Switching to mode %s", mode) + consensus.setMode(mode) + } + + consensus.UpdateValidatorMetrics(float64(numSignatures), float64(newBlock.NumberU64())) + + // 1% of the validator also need to do broadcasting + rnd := rand.Intn(100) + if rnd < 1 { + // Beacon validators also broadcast new blocks to make sure beacon sync is strong. + if IsRunningBeaconChain(consensus) { + BroadcastNewBlock(consensus.host, newBlock, consensus.registry.GetNodeConfig()) + } + BroadcastCXReceipts(newBlock, consensus) + } + } + } + + // Broadcast client requested missing cross shard receipts if there is any + BroadcastMissingCXReceipts(consensus) + + if h := consensus.registry.GetNodeConfig().WebHooks.Hooks; h != nil { + if h.Availability != nil { + shardState, err := consensus.Blockchain().ReadShardState(newBlock.Epoch()) + if err != nil { + utils.Logger().Error().Err(err). + Int64("epoch", newBlock.Epoch().Int64()). + Uint32("shard-id", consensus.ShardID). + Msg("failed to read shard state") + return err + } + + for _, addr := range consensus.Registry().GetAddressToBLSKey().GetAddresses(consensus.getPublicKeys(), shardState, newBlock.Epoch()) { + wrapper, err := consensus.Beaconchain().ReadValidatorInformation(addr) + if err != nil { + utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator info") + return nil + } + snapshot, err := consensus.Beaconchain().ReadValidatorSnapshot(addr) + if err != nil { + utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator snapshot") + return nil + } + computed := availability.ComputeCurrentSigning( + snapshot.Validator, wrapper, + ) + lastBlockOfEpoch := shard.Schedule.EpochLastBlock(consensus.Beaconchain().CurrentBlock().Header().Epoch().Uint64()) + + computed.BlocksLeftInEpoch = lastBlockOfEpoch - consensus.Beaconchain().CurrentBlock().Header().Number().Uint64() + + if err != nil && computed.IsBelowThreshold { + url := h.Availability.OnDroppedBelowThreshold + go func() { + webhooks.DoPost(url, computed) + }() + } + } + } + } + return nil +} + +// BroadcastNewBlock is called by consensus leader to sync new blocks with other clients/nodes. +// NOTE: For now, just send to the client (basically not broadcasting) +// TODO (lc): broadcast the new blocks to new nodes doing state sync +func BroadcastNewBlock(host p2p.Host, newBlock *types.Block, nodeConfig *nodeconfig.ConfigType) { + groups := []nodeconfig.GroupID{nodeConfig.GetClientGroupID()} + utils.Logger().Info(). + Msgf( + "broadcasting new block %d, group %s", newBlock.NumberU64(), groups[0], + ) + msg := p2p.ConstructMessage( + proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}), + ) + if err := host.SendMessageToGroups(groups, msg); err != nil { + utils.Logger().Warn().Err(err).Msg("cannot broadcast new block") + } +} + +func IsRunningBeaconChain(c *Consensus) bool { + return c.ShardID == shard.BeaconChainShardID +} + +// BroadcastCXReceipts broadcasts cross shard receipts to correspoding +// destination shards +func BroadcastCXReceipts(newBlock *types.Block, consensus *Consensus) { + commitSigAndBitmap := newBlock.GetCurrentCommitSig() + //#### Read payload data from committed msg + if len(commitSigAndBitmap) <= 96 { + utils.Logger().Debug().Int("commitSigAndBitmapLen", len(commitSigAndBitmap)).Msg("[BroadcastCXReceipts] commitSigAndBitmap Not Enough Length") + return + } + commitSig := make([]byte, 96) + commitBitmap := make([]byte, len(commitSigAndBitmap)-96) + offset := 0 + copy(commitSig[:], commitSigAndBitmap[offset:offset+96]) + offset += 96 + copy(commitBitmap[:], commitSigAndBitmap[offset:]) + //#### END Read payload data from committed msg + + epoch := newBlock.Header().Epoch() + shardingConfig := shard.Schedule.InstanceForEpoch(epoch) + shardNum := int(shardingConfig.NumShards()) + myShardID := consensus.ShardID + utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]") + + for i := 0; i < shardNum; i++ { + if i == int(myShardID) { + continue + } + BroadcastCXReceiptsWithShardID(newBlock.Header(), commitSig, commitBitmap, uint32(i), consensus) + } +} + +// BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID +func BroadcastCXReceiptsWithShardID(block *block.Header, commitSig []byte, commitBitmap []byte, toShardID uint32, consensus *Consensus) { + myShardID := consensus.ShardID + utils.Logger().Debug(). + Uint32("toShardID", toShardID). + Uint32("myShardID", myShardID). + Uint64("blockNum", block.NumberU64()). + Msg("[BroadcastCXReceiptsWithShardID]") + + cxReceipts, err := consensus.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash()) + if err != nil || len(cxReceipts) == 0 { + utils.Logger().Debug().Uint32("ToShardID", toShardID). + Int("numCXReceipts", len(cxReceipts)). + Msg("[CXMerkleProof] No receipts found for the destination shard") + return + } + + merkleProof, err := consensus.Blockchain().CXMerkleProof(toShardID, block) + if err != nil { + utils.Logger().Warn(). + Uint32("ToShardID", toShardID). + Msg("[BroadcastCXReceiptsWithShardID] Unable to get merkleProof") + return + } + + cxReceiptsProof := &types.CXReceiptsProof{ + Receipts: cxReceipts, + MerkleProof: merkleProof, + Header: block, + CommitSig: commitSig, + CommitBitmap: commitBitmap, + } + + groupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(toShardID)) + utils.Logger().Info().Uint32("ToShardID", toShardID). + Str("GroupID", string(groupID)). + Interface("cxp", cxReceiptsProof). + Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof ready. Sending CX receipts...") + // TODO ek – limit concurrency + consensus.GetHost().SendMessageToGroups([]nodeconfig.GroupID{groupID}, + p2p.ConstructMessage(proto_node.ConstructCXReceiptsProof(cxReceiptsProof)), + ) +} + +// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request +func BroadcastMissingCXReceipts(c *Consensus) { + var ( + sendNextTime = make([]core.CxEntry, 0) + cxPool = c.Registry().GetCxPool() + blockchain = c.Blockchain() + ) + it := cxPool.Pool().Iterator() + for entry := range it.C { + cxEntry := entry.(core.CxEntry) + toShardID := cxEntry.ToShardID + blk := blockchain.GetBlockByHash(cxEntry.BlockHash) + if blk == nil { + continue + } + blockNum := blk.NumberU64() + nextHeader := blockchain.GetHeaderByNumber(blockNum + 1) + if nextHeader == nil { + sendNextTime = append(sendNextTime, cxEntry) + continue + } + sig := nextHeader.LastCommitSignature() + bitmap := nextHeader.LastCommitBitmap() + BroadcastCXReceiptsWithShardID(blk.Header(), sig[:], bitmap, toShardID, c) + } + cxPool.Clear() + // this should not happen or maybe happen for impatient user + for _, entry := range sendNextTime { + cxPool.Add(entry) + } +} diff --git a/consensus/quorum/quorum.go b/consensus/quorum/quorum.go index 148c8cb4a9..070a85e029 100644 --- a/consensus/quorum/quorum.go +++ b/consensus/quorum/quorum.go @@ -3,6 +3,7 @@ package quorum import ( "fmt" "math/big" + "sync/atomic" "github.com/harmony-one/harmony/crypto/bls" @@ -153,7 +154,8 @@ type cIdentities struct { commit *votepower.Round // viewIDSigs: every validator // sign on |viewID|blockHash| in view changing message - viewChange *votepower.Round + viewChange *votepower.Round + participantsCount int64 } func (s *cIdentities) AggregateVotes(p Phase) *bls_core.Sign { @@ -286,10 +288,11 @@ func (s *cIdentities) UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapp } s.publicKeys = pubKeys s.keyIndexMap = keyIndexMap + atomic.StoreInt64(&s.participantsCount, int64(len(s.publicKeys))) } func (s *cIdentities) ParticipantsCount() int64 { - return int64(len(s.publicKeys)) + return atomic.LoadInt64(&s.participantsCount) } func (s *cIdentities) SignersCount(p Phase) int64 { diff --git a/consensus/quorum/thread_safe_decider.go b/consensus/quorum/thread_safe_decider.go index 9df0ec03f1..6b6279e53e 100644 --- a/consensus/quorum/thread_safe_decider.go +++ b/consensus/quorum/thread_safe_decider.go @@ -47,8 +47,6 @@ func (a threadSafeDeciderImpl) IndexOf(key bls.SerializedPublicKey) int { } func (a threadSafeDeciderImpl) ParticipantsCount() int64 { - a.mu.Lock() - defer a.mu.Unlock() return a.decider.ParticipantsCount() } diff --git a/consensus/view_change.go b/consensus/view_change.go index a0ac6c0f79..2752270cba 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -2,15 +2,14 @@ package consensus import ( "math/big" + "sync/atomic" "time" - "github.com/harmony-one/harmony/internal/chain" - - "github.com/harmony-one/harmony/crypto/bls" - "github.com/ethereum/go-ethereum/common" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/chain" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" @@ -24,7 +23,7 @@ const MaxViewIDDiff = 249 // State contains current mode and current viewID type State struct { - mode Mode + mode uint32 // current view id in normal mode // it changes per successful consensus @@ -35,43 +34,49 @@ type State struct { viewChangingID uint64 } +func NewState(mode Mode) State { + return State{ + mode: uint32(mode), + } +} + // Mode return the current node mode func (pm *State) Mode() Mode { - return pm.mode + return Mode(atomic.LoadUint32(&pm.mode)) } // SetMode set the node mode as required func (pm *State) SetMode(s Mode) { - pm.mode = s + atomic.StoreUint32(&pm.mode, uint32(s)) } // GetCurBlockViewID return the current view id func (pm *State) GetCurBlockViewID() uint64 { - return pm.blockViewID + return atomic.LoadUint64(&pm.blockViewID) } // SetCurBlockViewID sets the current view id func (pm *State) SetCurBlockViewID(viewID uint64) uint64 { - pm.blockViewID = viewID - return pm.blockViewID + atomic.StoreUint64(&pm.blockViewID, viewID) + return viewID } // GetViewChangingID return the current view changing id // It is meaningful during view change mode func (pm *State) GetViewChangingID() uint64 { - return pm.viewChangingID + return atomic.LoadUint64(&pm.viewChangingID) } // SetViewChangingID set the current view changing id // It is meaningful during view change mode func (pm *State) SetViewChangingID(id uint64) { - pm.viewChangingID = id + atomic.StoreUint64(&pm.viewChangingID, id) } // GetViewChangeDuraion return the duration of the current view change // It increase in the power of difference betweeen view changing ID and current view ID func (pm *State) GetViewChangeDuraion() time.Duration { - diff := int64(pm.viewChangingID - pm.blockViewID) + diff := int64(pm.GetViewChangingID() - pm.GetCurBlockViewID()) return time.Duration(diff * diff * int64(viewChangeDuration)) } @@ -152,12 +157,12 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com epoch := big.NewInt(0) if blockchain == nil { consensus.getLogger().Error().Msg("[getNextLeaderKey] Blockchain is nil. Use consensus.LeaderPubKey") - lastLeaderPubKey = consensus.LeaderPubKey + lastLeaderPubKey = consensus.getLeaderPubKey() } else { curHeader := blockchain.CurrentHeader() if curHeader == nil { consensus.getLogger().Error().Msg("[getNextLeaderKey] Failed to get current header from blockchain") - lastLeaderPubKey = consensus.LeaderPubKey + lastLeaderPubKey = consensus.getLeaderPubKey() } else { stuckBlockViewID := curHeader.ViewID().Uint64() + 1 gap = int(viewID - stuckBlockViewID) @@ -166,7 +171,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com if err != nil || lastLeaderPubKey == nil { consensus.getLogger().Error().Err(err). Msg("[getNextLeaderKey] Unable to get leaderPubKey from coinbase. Set it to consensus.LeaderPubKey") - lastLeaderPubKey = consensus.LeaderPubKey + lastLeaderPubKey = consensus.getLeaderPubKey() } epoch = curHeader.Epoch() // viewchange happened at the first block of new epoch @@ -183,7 +188,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com } consensus.getLogger().Info(). Str("lastLeaderPubKey", lastLeaderPubKey.Bytes.Hex()). - Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()). + Str("leaderPubKey", consensus.getLeaderPubKey().Bytes.Hex()). Int("gap", gap). Uint64("newViewID", viewID). Uint64("myCurBlockViewID", consensus.getCurBlockViewID()). @@ -212,7 +217,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com } if !wasFound { consensus.getLogger().Warn(). - Str("key", consensus.LeaderPubKey.Bytes.Hex()). + Str("key", consensus.getLeaderPubKey().Bytes.Hex()). Msg("[getNextLeaderKey] currentLeaderKey not found") } consensus.getLogger().Info(). @@ -257,13 +262,13 @@ func (consensus *Consensus) startViewChange() { // aganist the consensus.LeaderPubKey variable. // Ideally, we shall use another variable to keep track of the // leader pubkey in viewchange mode - consensus.LeaderPubKey = consensus.getNextLeaderKey(nextViewID, committee) + consensus.setLeaderPubKey(consensus.getNextLeaderKey(nextViewID, committee)) consensus.getLogger().Warn(). Uint64("nextViewID", nextViewID). Uint64("viewChangingID", consensus.getViewChangingID()). Dur("timeoutDuration", duration). - Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()). + Str("NextLeader", consensus.getLeaderPubKey().Bytes.Hex()). Msg("[startViewChange]") consensusVCCounterVec.With(prometheus.Labels{"viewchange": "started"}).Inc() @@ -540,7 +545,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { // newView message verified success, override my state consensus.setViewIDs(recvMsg.ViewID) - consensus.LeaderPubKey = senderKey + consensus.setLeaderPubKey(senderKey) consensus.resetViewChangeState() consensus.msgSender.StopRetry(msg_pb.MessageType_VIEWCHANGE) @@ -554,7 +559,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { consensus.getLogger().Info().Msg("onNewView === announce") } consensus.getLogger().Info(). - Str("newLeaderKey", consensus.LeaderPubKey.Bytes.Hex()). + Str("newLeaderKey", consensus.getLeaderPubKey().Bytes.Hex()). Msg("new leader changed") consensus.consensusTimeout[timeoutConsensus].Start() consensusVCCounterVec.With(prometheus.Labels{"viewchange": "finished"}).Inc() diff --git a/consensus/view_change_msg.go b/consensus/view_change_msg.go index 6c4b080055..1ba9c76063 100644 --- a/consensus/view_change_msg.go +++ b/consensus/view_change_msg.go @@ -27,7 +27,7 @@ func (consensus *Consensus) constructViewChangeMessage(priKey *bls.PrivateKeyWra BlockNum: consensus.getBlockNum(), ShardId: consensus.ShardID, SenderPubkey: priKey.Pub.Bytes[:], - LeaderPubkey: consensus.LeaderPubKey.Bytes[:], + LeaderPubkey: consensus.getLeaderPubKey().Bytes[:], }, }, } @@ -71,7 +71,7 @@ func (consensus *Consensus) constructViewChangeMessage(priKey *bls.PrivateKeyWra consensus.getLogger().Info(). Hex("m1Payload", vcMsg.Payload). - Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()). + Str("NextLeader", consensus.getLeaderPubKey().Bytes.Hex()). Str("SenderPubKey", priKey.Pub.Bytes.Hex()). Msg("[constructViewChangeMessage]") diff --git a/consensus/view_change_test.go b/consensus/view_change_test.go index bbc6999445..42b44f7c78 100644 --- a/consensus/view_change_test.go +++ b/consensus/view_change_test.go @@ -14,14 +14,13 @@ func TestBasicViewChanging(t *testing.T) { _, _, consensus, _, err := GenerateConsensusForTesting() assert.NoError(t, err) - state := State{mode: Normal} + state := NewState(Normal) // Change Mode assert.Equal(t, state.mode, consensus.current.mode) assert.Equal(t, state.Mode(), consensus.current.Mode()) consensus.current.SetMode(ViewChanging) - assert.Equal(t, ViewChanging, consensus.current.mode) assert.Equal(t, ViewChanging, consensus.current.Mode()) // Change ViewID @@ -114,7 +113,7 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) { consensus.Decider().UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{}) assert.Equal(t, keyCount, consensus.Decider().ParticipantsCount()) - consensus.LeaderPubKey = &wrappedBLSKeys[0] + consensus.setLeaderPubKey(&wrappedBLSKeys[0]) nextKey := consensus.getNextLeaderKey(uint64(1), nil) assert.Equal(t, nextKey, &wrappedBLSKeys[1]) diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index e516394834..484c7926a8 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -2,124 +2,10 @@ package node import ( "github.com/ethereum/go-ethereum/rlp" - proto_node "github.com/harmony-one/harmony/api/proto/node" - "github.com/harmony-one/harmony/block" - "github.com/harmony-one/harmony/consensus" - "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/shard" ) -// BroadcastCXReceipts broadcasts cross shard receipts to correspoding -// destination shards -func BroadcastCXReceipts(newBlock *types.Block, consensus *consensus.Consensus) { - commitSigAndBitmap := newBlock.GetCurrentCommitSig() - //#### Read payload data from committed msg - if len(commitSigAndBitmap) <= 96 { - utils.Logger().Debug().Int("commitSigAndBitmapLen", len(commitSigAndBitmap)).Msg("[BroadcastCXReceipts] commitSigAndBitmap Not Enough Length") - return - } - commitSig := make([]byte, 96) - commitBitmap := make([]byte, len(commitSigAndBitmap)-96) - offset := 0 - copy(commitSig[:], commitSigAndBitmap[offset:offset+96]) - offset += 96 - copy(commitBitmap[:], commitSigAndBitmap[offset:]) - //#### END Read payload data from committed msg - - epoch := newBlock.Header().Epoch() - shardingConfig := shard.Schedule.InstanceForEpoch(epoch) - shardNum := int(shardingConfig.NumShards()) - myShardID := consensus.ShardID - utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]") - - for i := 0; i < shardNum; i++ { - if i == int(myShardID) { - continue - } - BroadcastCXReceiptsWithShardID(newBlock.Header(), commitSig, commitBitmap, uint32(i), consensus) - } -} - -// BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID -func BroadcastCXReceiptsWithShardID(block *block.Header, commitSig []byte, commitBitmap []byte, toShardID uint32, consensus *consensus.Consensus) { - myShardID := consensus.ShardID - utils.Logger().Debug(). - Uint32("toShardID", toShardID). - Uint32("myShardID", myShardID). - Uint64("blockNum", block.NumberU64()). - Msg("[BroadcastCXReceiptsWithShardID]") - - cxReceipts, err := consensus.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash()) - if err != nil || len(cxReceipts) == 0 { - utils.Logger().Debug().Uint32("ToShardID", toShardID). - Int("numCXReceipts", len(cxReceipts)). - Msg("[CXMerkleProof] No receipts found for the destination shard") - return - } - - merkleProof, err := consensus.Blockchain().CXMerkleProof(toShardID, block) - if err != nil { - utils.Logger().Warn(). - Uint32("ToShardID", toShardID). - Msg("[BroadcastCXReceiptsWithShardID] Unable to get merkleProof") - return - } - - cxReceiptsProof := &types.CXReceiptsProof{ - Receipts: cxReceipts, - MerkleProof: merkleProof, - Header: block, - CommitSig: commitSig, - CommitBitmap: commitBitmap, - } - - groupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(toShardID)) - utils.Logger().Info().Uint32("ToShardID", toShardID). - Str("GroupID", string(groupID)). - Interface("cxp", cxReceiptsProof). - Msg("[BroadcastCXReceiptsWithShardID] ReadCXReceipts and MerkleProof ready. Sending CX receipts...") - // TODO ek – limit concurrency - go consensus.GetHost().SendMessageToGroups([]nodeconfig.GroupID{groupID}, - p2p.ConstructMessage(proto_node.ConstructCXReceiptsProof(cxReceiptsProof)), - ) -} - -// BroadcastMissingCXReceipts broadcasts missing cross shard receipts per request -func BroadcastMissingCXReceipts(c *consensus.Consensus) { - var ( - sendNextTime = make([]core.CxEntry, 0) - cxPool = c.Registry().GetCxPool() - blockchain = c.Blockchain() - ) - it := cxPool.Pool().Iterator() - for entry := range it.C { - cxEntry := entry.(core.CxEntry) - toShardID := cxEntry.ToShardID - blk := blockchain.GetBlockByHash(cxEntry.BlockHash) - if blk == nil { - continue - } - blockNum := blk.NumberU64() - nextHeader := blockchain.GetHeaderByNumber(blockNum + 1) - if nextHeader == nil { - sendNextTime = append(sendNextTime, cxEntry) - continue - } - sig := nextHeader.LastCommitSignature() - bitmap := nextHeader.LastCommitBitmap() - BroadcastCXReceiptsWithShardID(blk.Header(), sig[:], bitmap, toShardID, c) - } - cxPool.Clear() - // this should not happen or maybe happen for impatient user - for _, entry := range sendNextTime { - cxPool.Add(entry) - } -} - // ProcessReceiptMessage store the receipts and merkle proof in local data store func (node *Node) ProcessReceiptMessage(msgPayload []byte) { cxp := types.CXReceiptsProof{} diff --git a/node/node_handler.go b/node/node_handler.go index 7aa87100bb..02c6f30884 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -14,17 +14,14 @@ import ( "github.com/harmony-one/harmony/api/proto" proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/block" - "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/shard" - "github.com/harmony-one/harmony/staking/availability" "github.com/harmony-one/harmony/staking/slash" staking "github.com/harmony-one/harmony/staking/types" - "github.com/harmony-one/harmony/webhooks" ) const p2pMsgPrefixSize = 5 @@ -325,97 +322,6 @@ func getCrosslinkHeadersForShards(shardChain core.BlockChain, curBlock *types.Bl return headers, nil } -// PostConsensusProcessing is called by consensus participants, after consensus is done, to: -// 1. [leader] send new block to the client -// 2. [leader] send cross shard tx receipts to destination shard -func (node *Node) PostConsensusProcessing(newBlock *types.Block) error { - if node.Consensus.IsLeader() { - if IsRunningBeaconChain(node.Consensus) { - // TODO: consider removing this and letting other nodes broadcast new blocks. - // But need to make sure there is at least 1 node that will do the job. - BroadcastNewBlock(node.host, newBlock, node.NodeConfig) - } - BroadcastCXReceipts(newBlock, node.Consensus) - } else { - if mode := node.Consensus.Mode(); mode != consensus.Listening { - numSignatures := node.Consensus.NumSignaturesIncludedInBlock(newBlock) - utils.Logger().Info(). - Uint64("blockNum", newBlock.NumberU64()). - Uint64("epochNum", newBlock.Epoch().Uint64()). - Uint64("ViewId", newBlock.Header().ViewID().Uint64()). - Str("blockHash", newBlock.Hash().String()). - Int("numTxns", len(newBlock.Transactions())). - Int("numStakingTxns", len(newBlock.StakingTransactions())). - Uint32("numSignatures", numSignatures). - Str("mode", mode.String()). - Msg("BINGO !!! Reached Consensus") - if node.Consensus.Mode() == consensus.Syncing { - mode = node.Consensus.UpdateConsensusInformation() - utils.Logger().Info().Msgf("Switching to mode %s", mode) - node.Consensus.SetMode(mode) - } - - node.Consensus.UpdateValidatorMetrics(float64(numSignatures), float64(newBlock.NumberU64())) - - // 1% of the validator also need to do broadcasting - rnd := rand.Intn(100) - if rnd < 1 { - // Beacon validators also broadcast new blocks to make sure beacon sync is strong. - if IsRunningBeaconChain(node.Consensus) { - BroadcastNewBlock(node.host, newBlock, node.NodeConfig) - } - BroadcastCXReceipts(newBlock, node.Consensus) - } - } - } - - // Broadcast client requested missing cross shard receipts if there is any - BroadcastMissingCXReceipts(node.Consensus) - - if h := node.NodeConfig.WebHooks.Hooks; h != nil { - if h.Availability != nil { - shardState, err := node.Blockchain().ReadShardState(newBlock.Epoch()) - if err != nil { - utils.Logger().Error().Err(err). - Int64("epoch", newBlock.Epoch().Int64()). - Uint32("shard-id", node.Consensus.ShardID). - Msg("failed to read shard state") - return err - } - for _, addr := range node.Consensus.Registry().GetAddressToBLSKey().GetAddresses(node.Consensus.GetPublicKeys(), shardState, newBlock.Epoch()) { - wrapper, err := node.Beaconchain().ReadValidatorInformation(addr) - if err != nil { - utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator info") - return nil - } - snapshot, err := node.Beaconchain().ReadValidatorSnapshot(addr) - if err != nil { - utils.Logger().Err(err).Str("addr", addr.Hex()).Msg("failed reaching validator snapshot") - return nil - } - computed := availability.ComputeCurrentSigning( - snapshot.Validator, wrapper, - ) - lastBlockOfEpoch := shard.Schedule.EpochLastBlock(node.Beaconchain().CurrentBlock().Header().Epoch().Uint64()) - - computed.BlocksLeftInEpoch = lastBlockOfEpoch - node.Beaconchain().CurrentBlock().Header().Number().Uint64() - - if err != nil && computed.IsBelowThreshold { - url := h.Availability.OnDroppedBelowThreshold - go func() { - webhooks.DoPost(url, computed) - }() - } - } - } - } - return nil -} - -func IsRunningBeaconChain(c *consensus.Consensus) bool { - return c.ShardID == shard.BeaconChainShardID -} - // BootstrapConsensus is a goroutine to check number of peers and start the consensus func (node *Node) BootstrapConsensus() error { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) diff --git a/node/node_handler_test.go b/node/node_handler_test.go index 867a9616dc..65c960e028 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -203,7 +203,7 @@ func TestVerifyVRF(t *testing.T) { shardState.Epoch = big.NewInt(1) shardState.Shards = append(shardState.Shards, com) - node.Consensus.LeaderPubKey = &bls.PublicKeyWrapper{Bytes: spKey, Object: pubKey} + node.Consensus.SetLeaderPubKey(&bls.PublicKeyWrapper{Bytes: spKey, Object: pubKey}) node.Worker.GetCurrentHeader().SetEpoch(big.NewInt(1)) node.Consensus.GenerateVrfAndProof(node.Worker.GetCurrentHeader()) block, _ := node.Worker.FinalizeNewBlock(