Skip to content

Commit

Permalink
Updating all sources of block.Transactions and do the corresponding w…
Browse files Browse the repository at this point in the history
…ork for block staking txns
  • Loading branch information
denniswon committed Apr 2, 2020
1 parent 87d3a5b commit ea54bba
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 25 deletions.
1 change: 1 addition & 0 deletions cmd/client/txgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ syncLoop:
if txGen.Consensus.ShardID == shardID {
utils.Logger().Info().
Int("txNum", len(block.Transactions())).
Int("stakingTxnNum", len(block.StakingTransactions())).
Uint32("shardID", shardID).
Str("preHash", block.ParentHash().Hex()).
Uint64("currentBlock", txGen.Blockchain().CurrentBlock().NumberU64()).
Expand Down
22 changes: 15 additions & 7 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ func (consensus *Consensus) reportMetrics(block types.Block) {
endTime := time.Now()
timeElapsed := endTime.Sub(startTime)
numOfTxs := len(block.Transactions())
numOfStakingTxs := len(block.StakingTransactions())
tps := float64(numOfTxs) / timeElapsed.Seconds()
consensus.getLogger().Info().
Int("numOfTXs", numOfTxs).
Expand All @@ -381,14 +382,21 @@ func (consensus *Consensus) reportMetrics(block types.Block) {
txHash := block.Transactions()[end-1-i].Hash()
txHashes = append(txHashes, hex.EncodeToString(txHash[:]))
}
stakingTxHashes := []string{}
for i, end := 0, len(block.StakingTransactions()); i < 3 && i < end; i++ {
txHash := block.StakingTransactions()[end-1-i].Hash()
txHashes = append(stakingTxHashes, hex.EncodeToString(txHash[:]))
}
metrics := map[string]interface{}{
"key": hex.EncodeToString(consensus.LeaderPubKey.Serialize()),
"tps": tps,
"txCount": numOfTxs,
"nodeCount": consensus.Decider.ParticipantsCount() + 1,
"latestBlockHash": hex.EncodeToString(consensus.blockHash[:]),
"latestTxHashes": txHashes,
"blockLatency": int(timeElapsed / time.Millisecond),
"key": hex.EncodeToString(consensus.LeaderPubKey.Serialize()),
"tps": tps,
"txCount": numOfTxs,
"stakingTxCount": numOfStakingTxs,
"nodeCount": consensus.Decider.ParticipantsCount() + 1,
"latestBlockHash": hex.EncodeToString(consensus.blockHash[:]),
"latestTxHashes": txHashes,
"latestStakingTxHashes": stakingTxHashes,
"blockLatency": int(timeElapsed / time.Millisecond),
}
profiler.LogMetrics(metrics)
}
Expand Down
1 change: 1 addition & 0 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ func (consensus *Consensus) Start(

consensus.getLogger().Debug().
Int("numTxs", len(newBlock.Transactions())).
Int("numStakingTxs", len(newBlock.StakingTransactions())).
Time("startTime", startTime).
Int64("publicKeys", consensus.Decider.ParticipantsCount()).
Msg("[ConsensusMainLoop] STARTING CONSENSUS")
Expand Down
30 changes: 26 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,12 +909,12 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts types.Receipts) error {
signer := types.MakeSigner(config, block.Epoch())

transactions, logIndex := block.Transactions(), uint(0)
if len(transactions) != len(receipts) {
return errors.New("transaction and receipt count mismatch")
transactions, stakingTransactions, logIndex := block.Transactions(), block.StakingTransactions(), uint(0)
if len(transactions)+len(stakingTransactions) != len(receipts) {
return errors.New("transaction+stakingTransactions and receipt count mismatch")
}

for j := 0; j < len(receipts); j++ {
for j := 0; j < len(transactions); j++ {
// The transaction hash can be retrieved from the transaction itself
receipts[j].TxHash = transactions[j].Hash()

Expand All @@ -940,6 +940,27 @@ func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts ty
logIndex++
}
}
// in a block, txns are processed before staking txns
for j := len(transactions); j < len(transactions)+len(stakingTransactions); j++ {
// The transaction hash can be retrieved from the staking transaction itself
receipts[j].TxHash = stakingTransactions[j].Hash()

// The used gas can be calculated based on previous receipts
if j == 0 {
receipts[j].GasUsed = receipts[j].CumulativeGasUsed
} else {
receipts[j].GasUsed = receipts[j].CumulativeGasUsed - receipts[j-1].CumulativeGasUsed
}
// The derived log fields can simply be set from the block and transaction
for k := 0; k < len(receipts[j].Logs); k++ {
receipts[j].Logs[k].BlockNumber = block.NumberU64()
receipts[j].Logs[k].BlockHash = block.Hash()
receipts[j].Logs[k].TxHash = receipts[j].TxHash
receipts[j].Logs[k].TxIndex = uint(j) + uint(len(transactions))
receipts[j].Logs[k].Index = logIndex
logIndex++
}
}
return nil
}

Expand Down Expand Up @@ -1358,6 +1379,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
Str("hash", block.Hash().Hex()).
Int("uncles", len(block.Uncles())).
Int("txs", len(block.Transactions())).
Int("stakingTxs", len(block.StakingTransactions())).
Uint64("gas", block.GasUsed()).
Str("elapsed", common.PrettyDuration(time.Since(bstart)).String()).
Logger()
Expand Down
5 changes: 4 additions & 1 deletion core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
blockfactory "github.com/harmony-one/harmony/block/factory"
mock "github.com/harmony-one/harmony/core/rawdb/mock"
"github.com/harmony-one/harmony/core/types"
staking "github.com/harmony-one/harmony/staking/types"
)

// Tests block header storage and retrieval operations.
Expand Down Expand Up @@ -142,7 +143,9 @@ func TestBlockStorage(t *testing.T) {
}
if entry := ReadBody(db, block.Hash(), block.NumberU64()); entry == nil {
t.Fatalf("Stored body not found")
} else if types.DeriveSha(types.Transactions(entry.Transactions())) != types.DeriveSha(block.Transactions()) || types.CalcUncleHash(entry.Uncles()) != types.CalcUncleHash(block.Uncles()) {
} else if types.DeriveSha(types.Transactions(entry.Transactions())) != types.DeriveSha(block.Transactions()) ||
types.DeriveSha(staking.StakingTransactions(entry.StakingTransactions())) != types.DeriveSha(block.StakingTransactions()) ||
types.CalcUncleHash(entry.Uncles()) != types.CalcUncleHash(block.Uncles()) {
t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, block.Body())
}
//if actual, err := ReadEpochBlockNumber(db, big.NewInt(0)); err != nil {
Expand Down
45 changes: 32 additions & 13 deletions core/tx_cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package core
import (
"runtime"

"github.com/ethereum/go-ethereum/core/types"
"github.com/harmony-one/harmony/core/types"
staking "github.com/harmony-one/harmony/staking/types"
)

// senderCacher is a concurrent transaction sender recoverer anc cacher.
Expand All @@ -32,9 +33,10 @@ var senderCacher = newTxSenderCacher(runtime.NumCPU())
// which is used to feed the same underlying input array to different threads but
// ensure they process the early transactions fast.
type txSenderCacherRequest struct {
signer types.Signer
txs []*types.Transaction
inc int
signer types.Signer
txs []*types.Transaction
stakingTxs []*staking.StakingTransaction
inc int
}

// txSenderCacher is a helper structure to concurrently ecrecover transaction
Expand Down Expand Up @@ -70,22 +72,31 @@ func (cacher *txSenderCacher) cache() {
// recover recovers the senders from a batch of transactions and caches them
// back into the same data structures. There is no validation being done, nor
// any reaction to invalid signatures. That is up to calling code later.
func (cacher *txSenderCacher) recover(signer types.Signer, txs []*types.Transaction) {
func (cacher *txSenderCacher) recover(signer types.Signer, txs []*types.Transaction, stakingTxs []*staking.StakingTransaction) {
// If there's nothing to recover, abort
if len(txs) == 0 {
if len(txs)+len(stakingTxs) == 0 {
return
}
// Ensure we have meaningful task sizes and schedule the recoveries
tasks := cacher.threads
if len(txs) < tasks*4 {
tasks = (len(txs) + 3) / 4
if len(txs)+len(stakingTxs) < tasks*4 {
tasks = (len(txs) + len(stakingTxs) + 3) / 4
}
for i := 0; i < tasks; i++ {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
txs: txs[i:],
inc: tasks,
if i < len(txs) {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
txs: txs[i:],
inc: tasks,
}
} else {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
stakingTxs: stakingTxs[i:],
inc: tasks,
}
}

}
}

Expand All @@ -101,5 +112,13 @@ func (cacher *txSenderCacher) recoverFromBlocks(signer types.Signer, blocks []*t
for _, block := range blocks {
txs = append(txs, block.Transactions()...)
}
cacher.recover(signer, txs)
stakingCount := 0
for _, block := range blocks {
stakingCount += len(block.StakingTransactions())
}
stakingTxs := make([]*staking.StakingTransaction, 0, stakingCount)
for _, block := range blocks {
stakingTxs = append(stakingTxs, block.StakingTransactions()...)
}
cacher.recover(signer, txs, stakingTxs)
}
1 change: 1 addition & 0 deletions node/node_cross_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
"crossLinkShard", crossLink.ShardID(),
"crossLinkBlock", crossLink.BlockNum(),
"numTx", len(block.Transactions()),
"numStakingTx", len(block.StakingTransactions()),
).WithCause(err)
}
}
Expand Down
1 change: 1 addition & 0 deletions node/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
return ctxerror.New("[VerifyNewBlock] Cannot Verify New Block!!!",
"blockHash", newBlock.Hash(),
"numTx", len(newBlock.Transactions()),
"numStakingTx", len(newBlock.StakingTransactions()),
).WithCause(err)
}

Expand Down

0 comments on commit ea54bba

Please sign in to comment.