Skip to content

Commit

Permalink
Updating all sources of block.Transactions and do the corresponding w…
Browse files Browse the repository at this point in the history
…ork for block staking txns
  • Loading branch information
denniswon committed Apr 2, 2020
1 parent c8df6e3 commit 603455e
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 18 deletions.
1 change: 1 addition & 0 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func (consensus *Consensus) Start(

consensus.getLogger().Debug().
Int("numTxs", len(newBlock.Transactions())).
Int("numStakingTxs", len(newBlock.StakingTransactions())).
Time("startTime", startTime).
Int64("publicKeys", consensus.Decider.ParticipantsCount()).
Msg("[ConsensusMainLoop] STARTING CONSENSUS")
Expand Down
30 changes: 26 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,12 +909,12 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts types.Receipts) error {
signer := types.MakeSigner(config, block.Epoch())

transactions, logIndex := block.Transactions(), uint(0)
if len(transactions) != len(receipts) {
return errors.New("transaction and receipt count mismatch")
transactions, stakingTransactions, logIndex := block.Transactions(), block.StakingTransactions(), uint(0)
if len(transactions)+len(stakingTransactions) != len(receipts) {
return errors.New("transaction+stakingTransactions and receipt count mismatch")
}

for j := 0; j < len(receipts); j++ {
for j := 0; j < len(transactions); j++ {
// The transaction hash can be retrieved from the transaction itself
receipts[j].TxHash = transactions[j].Hash()

Expand All @@ -940,6 +940,27 @@ func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts ty
logIndex++
}
}
// in a block, txns are processed before staking txns
for j := len(transactions); j < len(transactions)+len(stakingTransactions); j++ {
// The transaction hash can be retrieved from the staking transaction itself
receipts[j].TxHash = stakingTransactions[j].Hash()

// The used gas can be calculated based on previous receipts
if j == 0 {
receipts[j].GasUsed = receipts[j].CumulativeGasUsed
} else {
receipts[j].GasUsed = receipts[j].CumulativeGasUsed - receipts[j-1].CumulativeGasUsed
}
// The derived log fields can simply be set from the block and transaction
for k := 0; k < len(receipts[j].Logs); k++ {
receipts[j].Logs[k].BlockNumber = block.NumberU64()
receipts[j].Logs[k].BlockHash = block.Hash()
receipts[j].Logs[k].TxHash = receipts[j].TxHash
receipts[j].Logs[k].TxIndex = uint(j) + uint(len(transactions))
receipts[j].Logs[k].Index = logIndex
logIndex++
}
}
return nil
}

Expand Down Expand Up @@ -1358,6 +1379,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifyHeaders bool) (int,
Str("hash", block.Hash().Hex()).
Int("uncles", len(block.Uncles())).
Int("txs", len(block.Transactions())).
Int("stakingTxs", len(block.StakingTransactions())).
Uint64("gas", block.GasUsed()).
Str("elapsed", common.PrettyDuration(time.Since(bstart)).String()).
Logger()
Expand Down
5 changes: 4 additions & 1 deletion core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
blockfactory "github.com/harmony-one/harmony/block/factory"
mock "github.com/harmony-one/harmony/core/rawdb/mock"
"github.com/harmony-one/harmony/core/types"
staking "github.com/harmony-one/harmony/staking/types"
)

// Tests block header storage and retrieval operations.
Expand Down Expand Up @@ -142,7 +143,9 @@ func TestBlockStorage(t *testing.T) {
}
if entry := ReadBody(db, block.Hash(), block.NumberU64()); entry == nil {
t.Fatalf("Stored body not found")
} else if types.DeriveSha(types.Transactions(entry.Transactions())) != types.DeriveSha(block.Transactions()) || types.CalcUncleHash(entry.Uncles()) != types.CalcUncleHash(block.Uncles()) {
} else if types.DeriveSha(types.Transactions(entry.Transactions())) != types.DeriveSha(block.Transactions()) ||
types.DeriveSha(staking.StakingTransactions(entry.StakingTransactions())) != types.DeriveSha(block.StakingTransactions()) ||
types.CalcUncleHash(entry.Uncles()) != types.CalcUncleHash(block.Uncles()) {
t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, block.Body())
}
//if actual, err := ReadEpochBlockNumber(db, big.NewInt(0)); err != nil {
Expand Down
45 changes: 32 additions & 13 deletions core/tx_cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package core
import (
"runtime"

"github.com/ethereum/go-ethereum/core/types"
"github.com/harmony-one/harmony/core/types"
staking "github.com/harmony-one/harmony/staking/types"
)

// senderCacher is a concurrent transaction sender recoverer anc cacher.
Expand All @@ -32,9 +33,10 @@ var senderCacher = newTxSenderCacher(runtime.NumCPU())
// which is used to feed the same underlying input array to different threads but
// ensure they process the early transactions fast.
type txSenderCacherRequest struct {
signer types.Signer
txs []*types.Transaction
inc int
signer types.Signer
txs []*types.Transaction
stakingTxs []*staking.StakingTransaction
inc int
}

// txSenderCacher is a helper structure to concurrently ecrecover transaction
Expand Down Expand Up @@ -70,22 +72,31 @@ func (cacher *txSenderCacher) cache() {
// recover recovers the senders from a batch of transactions and caches them
// back into the same data structures. There is no validation being done, nor
// any reaction to invalid signatures. That is up to calling code later.
func (cacher *txSenderCacher) recover(signer types.Signer, txs []*types.Transaction) {
func (cacher *txSenderCacher) recover(signer types.Signer, txs []*types.Transaction, stakingTxs []*staking.StakingTransaction) {
// If there's nothing to recover, abort
if len(txs) == 0 {
if len(txs)+len(stakingTxs) == 0 {
return
}
// Ensure we have meaningful task sizes and schedule the recoveries
tasks := cacher.threads
if len(txs) < tasks*4 {
tasks = (len(txs) + 3) / 4
if len(txs)+len(stakingTxs) < tasks*4 {
tasks = (len(txs) + len(stakingTxs) + 3) / 4
}
for i := 0; i < tasks; i++ {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
txs: txs[i:],
inc: tasks,
if i < len(txs) {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
txs: txs[i:],
inc: tasks,
}
} else {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
stakingTxs: stakingTxs[i:],
inc: tasks,
}
}

}
}

Expand All @@ -101,5 +112,13 @@ func (cacher *txSenderCacher) recoverFromBlocks(signer types.Signer, blocks []*t
for _, block := range blocks {
txs = append(txs, block.Transactions()...)
}
cacher.recover(signer, txs)
stakingCount := 0
for _, block := range blocks {
stakingCount += len(block.StakingTransactions())
}
stakingTxs := make([]*staking.StakingTransaction, 0, stakingCount)
for _, block := range blocks {
stakingTxs = append(stakingTxs, block.StakingTransactions()...)
}
cacher.recover(signer, txs, stakingTxs)
}
1 change: 1 addition & 0 deletions node/node_cross_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
"crossLinkShard", crossLink.ShardID(),
"crossLinkBlock", crossLink.BlockNum(),
"numTx", len(block.Transactions()),
"numStakingTx", len(block.StakingTransactions()),
).WithCause(err)
}
}
Expand Down
1 change: 1 addition & 0 deletions node/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
return ctxerror.New("[VerifyNewBlock] Cannot Verify New Block!!!",
"blockHash", newBlock.Hash(),
"numTx", len(newBlock.Transactions()),
"numStakingTx", len(newBlock.StakingTransactions()),
).WithCause(err)
}

Expand Down

0 comments on commit 603455e

Please sign in to comment.