Skip to content

Commit

Permalink
fix vote and block insertion race condition (ethereum#51)
Browse files Browse the repository at this point in the history
* fix vote and block insertion race condition

* fix race condition in the vote handler using multiple go routine

* check go routine race condition during ci cd

* remove race check as there are eth code that is failing

* remove unused signature list variable
  • Loading branch information
wjrjerome authored Feb 3, 2022
1 parent 7cc2bef commit 23cbf68
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 39 deletions.
67 changes: 52 additions & 15 deletions consensus/XDPoS/engines/engine_v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,19 +544,22 @@ func (x *XDPoS_v2) SyncInfoHandler(chain consensus.ChainReader, syncInfo *utils.
*/
func (x *XDPoS_v2) VerifyVoteMessage(chain consensus.ChainReader, vote *utils.Vote) (bool, error) {
/*
1. Get masterNode list belong to this epoch by hash
1. Get masterNode list from snapshot
2. Check signature:
- Use ecRecover to get the public key
- Use the above public key to find out the xdc address
- Use the above xdc address to check against the master node list from step 1(For the running epoch)
3. Verify blockInfo
4. Broadcast(Not part of consensus)
*/
epochInfo, err := x.getEpochSwitchInfo(chain, nil, vote.ProposedBlockInfo.Hash)
snapshot, err := x.getSnapshot(chain, vote.ProposedBlockInfo.Number.Uint64())
if err != nil {
log.Error("[VerifyVoteMessage] Error when getting epoch switch Info to verify vote message", "Error", err)
log.Error("[VerifyVoteMessage] fail to get snapshot for a vote message", "BlockNum", vote.ProposedBlockInfo.Number, "Hash", vote.ProposedBlockInfo.Hash, "Error", err.Error())
}
return x.verifyMsgSignature(utils.VoteSigHash(vote.ProposedBlockInfo), vote.Signature, epochInfo.Masternodes)
verified, err := x.verifyMsgSignature(utils.VoteSigHash(vote.ProposedBlockInfo), vote.Signature, snapshot.NextEpochMasterNodes)
if err != nil {
log.Error("[VerifyVoteMessage] Error while verifying vote message", "Error", err.Error())
}
return verified, err
}

// Consensus entry point for processing vote message to produce QC
Expand All @@ -583,18 +586,16 @@ func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote)
log.Info(fmt.Sprintf("Vote pool threashold reached: %v, number of items in the pool: %v", thresholdReached, numberOfVotesInPool))

// Check if the block already exist, otherwise we try luck with the next vote
proposedBlock := chain.GetHeaderByHash(voteMsg.ProposedBlockInfo.Hash)
if proposedBlock == nil {
proposedBlockHeader := chain.GetHeaderByHash(voteMsg.ProposedBlockInfo.Hash)
if proposedBlockHeader == nil {
log.Warn("[voteHandler] The proposed block from vote message does not exist yet, wait for the next vote to try again", "Hash", voteMsg.ProposedBlockInfo.Hash, "Round", voteMsg.ProposedBlockInfo.Round)
return nil
}

err := x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg)
err := x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg, proposedBlockHeader)
if err != nil {
return err
}
// clean up vote at the same poolKey. and pookKey is proposed block hash
x.votePool.ClearPoolKeyByObj(voteMsg)
}

return nil
Expand All @@ -604,22 +605,55 @@ func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote)
Function that will be called by votePool when it reached threshold.
In the engine v2, we will need to generate and process QC
*/
func (x *XDPoS_v2) onVotePoolThresholdReached(chain consensus.ChainReader, pooledVotes map[common.Hash]utils.PoolObj, currentVoteMsg utils.PoolObj) error {
signatures := []utils.Signature{}
for _, v := range pooledVotes {
signatures = append(signatures, v.(*utils.Vote).Signature)
func (x *XDPoS_v2) onVotePoolThresholdReached(chain consensus.ChainReader, pooledVotes map[common.Hash]utils.PoolObj, currentVoteMsg utils.PoolObj, proposedBlockHeader *types.Header) error {

masternodes := x.GetMasternodes(chain, proposedBlockHeader)

// Filter out non-Master nodes signatures
var wg sync.WaitGroup
wg.Add(len(pooledVotes))
signatureSlice := make([]utils.Signature, len(pooledVotes))
counter := 0
for h, vote := range pooledVotes {
go func(hash common.Hash, v *utils.Vote, i int) {
defer wg.Done()
verified, err := x.verifyMsgSignature(utils.VoteSigHash(v.ProposedBlockInfo), v.Signature, masternodes)
if !verified || err != nil {
log.Warn("[onVotePoolThresholdReached] Skip not verified vote signatures when building QC", "Error", err.Error(), "verified", verified)
} else {
signatureSlice[i] = v.Signature
}
}(h, vote.(*utils.Vote), counter)
counter++
}
wg.Wait()

// The signature list may contain empty entey. we only care the ones with values
var validSignatureSlice []utils.Signature
for _, v := range signatureSlice {
if len(v) != 0 {
validSignatureSlice = append(validSignatureSlice, v)
}
}

// Skip and wait for the next vote to process again if valid votes is less than what we required
if len(validSignatureSlice) < x.config.V2.CertThreshold {
log.Warn("[onVotePoolThresholdReached] Not enough valid signatures to generate QC", "VotesSignaturesAfterFilter", validSignatureSlice, "NumberOfValidVotes", len(validSignatureSlice), "NumberOfVotes", len(pooledVotes))
return nil
}
// Genrate QC
quorumCert := &utils.QuorumCert{
ProposedBlockInfo: currentVoteMsg.(*utils.Vote).ProposedBlockInfo,
Signatures: signatures,
Signatures: validSignatureSlice,
}
err := x.processQC(chain, quorumCert)
if err != nil {
log.Error("Error while processing QC in the Vote handler after reaching pool threshold, ", err)
return err
}
log.Info("🗳 Successfully processed the vote and produced QC!")
// clean up vote at the same poolKey. and pookKey is proposed block hash
x.votePool.ClearPoolKeyByObj(currentVoteMsg)
return nil
}

Expand Down Expand Up @@ -1004,6 +1038,9 @@ func (x *XDPoS_v2) signSignature(signingHash common.Hash) (utils.Signature, erro
}

func (x *XDPoS_v2) verifyMsgSignature(signedHashToBeVerified common.Hash, signature utils.Signature, masternodes []common.Address) (bool, error) {
if len(masternodes) == 0 {
return false, fmt.Errorf("Empty masternode list detected when verifying message signatures")
}
// Recover the public key and the Ethereum address
pubkey, err := crypto.Ecrecover(signedHashToBeVerified.Bytes(), signature)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion consensus/tests/countdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) {
// We can only test valid = false for now as the implementation for getCurrentRoundMasterNodes is not complete
assert.False(t, valid)
// This shows we are able to decode the timeout message, which is what this test is all about
assert.Regexp(t, "^Masternodes does not contain signer addres.*", err.Error())
assert.Regexp(t, "Empty masternode list detected when verifying message signatures", err.Error())
}
62 changes: 56 additions & 6 deletions consensus/tests/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@ package tests
import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"os"
"strings"
"testing"
"time"

"github.com/XinFinOrg/XDPoSChain/accounts"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind/backends"
"github.com/XinFinOrg/XDPoSChain/accounts/keystore"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
Expand Down Expand Up @@ -60,6 +65,50 @@ func debugMessage(backend *backends.SimulatedBackend, signers signersList, t *te
}
}

func SignHashByPK(pk *ecdsa.PrivateKey, itemToSign []byte) []byte {
signer, signFn, err := getSignerAndSignFn(pk)
if err != nil {
panic(err)
}
signedHash, err := signFn(accounts.Account{Address: signer}, itemToSign)
if err != nil {
panic(err)
}
return signedHash
}

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func RandStringBytes(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}

func getSignerAndSignFn(pk *ecdsa.PrivateKey) (common.Address, func(account accounts.Account, hash []byte) ([]byte, error), error) {
veryLightScryptN := 2
veryLightScryptP := 1
dir, _ := ioutil.TempDir("", fmt.Sprintf("eth-getSignerAndSignFn-test-%v", RandStringBytes(5)))

new := func(kd string) *keystore.KeyStore {
return keystore.NewKeyStore(kd, veryLightScryptN, veryLightScryptP)
}

defer os.RemoveAll(dir)
ks := new(dir)
pass := "" // not used but required by API
a1, err := ks.ImportECDSA(pk, pass)
if err != nil {
return common.Address{}, nil, fmt.Errorf(err.Error())
}
if err := ks.Unlock(a1, ""); err != nil {
return a1.Address, nil, fmt.Errorf(err.Error())
}
return a1.Address, ks.SignHash, nil
}

func getCommonBackend(t *testing.T, chainConfig *params.ChainConfig) *backends.SimulatedBackend {

// initial helper backend
Expand Down Expand Up @@ -229,12 +278,13 @@ func GetCandidateFromCurrentSmartContract(backend bind.ContractBackend, t *testi
func PrepareXDCTestBlockChain(t *testing.T, numOfBlocks int, chainConfig *params.ChainConfig) (*BlockChain, *backends.SimulatedBackend, *types.Block, common.Address) {
// Preparation
var err error
// Authorise
signer, signFn, err := backends.SimulateWalletAddressAndSignFn()

backend := getCommonBackend(t, chainConfig)
blockchain := backend.GetBlockChain()
blockchain.Client = backend

// Authorise
signer, signFn, err := backends.SimulateWalletAddressAndSignFn()
if err != nil {
panic(fmt.Errorf("Error while creating simulated wallet for generating singer address and signer fn: %v", err))
}
Expand Down Expand Up @@ -279,15 +329,15 @@ func PrepareXDCTestBlockChain(t *testing.T, numOfBlocks int, chainConfig *params
func PrepareXDCTestBlockChainForV2Engine(t *testing.T, numOfBlocks int, chainConfig *params.ChainConfig, numOfForkedBlocks int) (*BlockChain, *backends.SimulatedBackend, *types.Block, common.Address, func(account accounts.Account, hash []byte) ([]byte, error), *types.Block) {
// Preparation
var err error
signer, signFn, err := backends.SimulateWalletAddressAndSignFn()
if err != nil {
panic(fmt.Errorf("Error while creating simulated wallet for generating singer address and signer fn: %v", err))
}
backend := getCommonBackend(t, chainConfig)
blockchain := backend.GetBlockChain()
blockchain.Client = backend

// Authorise
signer, signFn, err := backends.SimulateWalletAddressAndSignFn()
if err != nil {
panic(fmt.Errorf("Error while creating simulated wallet for generating singer address and signer fn: %v", err))
}
blockchain.Engine().(*XDPoS.XDPoS).Authorize(signer, signFn)

currentBlock := blockchain.Genesis()
Expand Down
Loading

0 comments on commit 23cbf68

Please sign in to comment.