Skip to content

Commit

Permalink
upgrade rlp, transactions process optimization (#111)
Browse files Browse the repository at this point in the history
* upgrade rlp to go-ethereum v.14.6

* core/types: transaction and receipt encoding/decoding optimizations #27976

* core/types: use new atomic types in caches #29411, fix eth/handler bug

* eth/fetcher: throttle peers which deliver many invalid transactions #25573

* core: preallocate map in tx_pool #25737

* core/txpool: protect cache with mutex #27898

* upgrade memsize to v0.0.2
  • Loading branch information
ryanmorphl2 authored Jul 9, 2024
1 parent 5f73e27 commit d964f78
Show file tree
Hide file tree
Showing 31 changed files with 488 additions and 190 deletions.
6 changes: 5 additions & 1 deletion cmd/devp2p/internal/ethtest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,13 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error {
return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash)
}
return nil

// ignore tx announcements from previous tests
case *NewPooledTransactionHashes:
// ignore tx announcements from previous tests
continue
case *Transactions:
continue

default:
return fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,9 +778,13 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg))
}
return

// ignore propagated txs from previous tests
case *NewPooledTransactionHashes:
continue
case *Transactions:
continue

// ignore block announcements from previous tests
case *NewBlockHashes:
continue
Expand Down
10 changes: 6 additions & 4 deletions cmd/devp2p/internal/ethtest/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/scroll-tech/go-ethereum/params"
)

//var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
// var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")

func (s *Suite) sendSuccessfulTxs(t *utesting.T, isEth66 bool) error {
Expand Down Expand Up @@ -200,10 +200,12 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction
}
// update nonce
nonce = txs[len(txs)-1].Nonce()
// Wait for the transaction announcement(s) and make sure all sent txs are being propagated

// Wait for the transaction announcement(s) and make sure all sent txs are being propagated.
// all txs should be announced within a couple announcements.
recvHashes := make([]common.Hash, 0)
// all txs should be announced within 3 announcements
for i := 0; i < 3; i++ {

for i := 0; i < 20; i++ {
switch msg := recvConn.readAndServe(s.chain, timeout).(type) {
case *Transactions:
for _, tx := range *msg {
Expand Down
34 changes: 27 additions & 7 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"container/heap"
"math"
"math/big"
"slices"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -49,16 +50,18 @@ func (h *nonceHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
old[n-1] = 0
*h = old[0 : n-1]
return x
}

// txSortedMap is a nonce->transaction hash map with a heap based index to allow
// iterating over the contents in a nonce-incrementing way.
type txSortedMap struct {
items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted
items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted
cacheMu sync.Mutex // Mutex covering the cache
}

// newTxSortedMap creates a new nonce-sorted transaction map.
Expand All @@ -81,7 +84,9 @@ func (m *txSortedMap) Put(tx *types.Transaction) {
if m.items[nonce] == nil {
heap.Push(m.index, nonce)
}
m.cacheMu.Lock()
m.items[nonce], m.cache = tx, nil
m.cacheMu.Unlock()
}

// Forward removes all transactions from the map with a nonce lower than the
Expand All @@ -97,9 +102,11 @@ func (m *txSortedMap) Forward(threshold uint64) types.Transactions {
delete(m.items, nonce)
}
// If we had a cached order, shift the front
m.cacheMu.Lock()
if m.cache != nil {
m.cache = m.cache[len(removed):]
}
m.cacheMu.Unlock()
return removed
}

Expand All @@ -123,7 +130,9 @@ func (m *txSortedMap) reheap() {
*m.index = append(*m.index, nonce)
}
heap.Init(m.index)
m.cacheMu.Lock()
m.cache = nil
m.cacheMu.Unlock()
}

// filter is identical to Filter, but **does not** regenerate the heap. This method
Expand All @@ -139,7 +148,9 @@ func (m *txSortedMap) filter(filter func(*types.Transaction) bool) types.Transac
}
}
if len(removed) > 0 {
m.cacheMu.Lock()
m.cache = nil
m.cacheMu.Unlock()
}
return removed
}
Expand All @@ -153,19 +164,21 @@ func (m *txSortedMap) Cap(threshold int) types.Transactions {
}
// Otherwise gather and drop the highest nonce'd transactions
var drops types.Transactions

sort.Sort(*m.index)
slices.Sort(*m.index)
for size := len(m.items); size > threshold; size-- {
drops = append(drops, m.items[(*m.index)[size-1]])
delete(m.items, (*m.index)[size-1])
}
*m.index = (*m.index)[:threshold]
heap.Init(m.index)
// The sorted m.index slice is still a valid heap, so there is no need to
// reheap after deleting tail items.

// If we had a cache, shift the back
m.cacheMu.Lock()
if m.cache != nil {
m.cache = m.cache[:len(m.cache)-len(drops)]
}
m.cacheMu.Unlock()
return drops
}

Expand All @@ -185,7 +198,9 @@ func (m *txSortedMap) Remove(nonce uint64) bool {
}
}
delete(m.items, nonce)
m.cacheMu.Lock()
m.cache = nil
m.cacheMu.Unlock()

return true
}
Expand All @@ -195,7 +210,7 @@ func (m *txSortedMap) Remove(nonce uint64) bool {
// removed from the list.
//
// Note, all transactions with nonces lower than start will also be returned to
// prevent getting into and invalid state. This is not something that should ever
// prevent getting into an invalid state. This is not something that should ever
// happen but better to be self correcting than failing!
func (m *txSortedMap) Ready(start uint64) types.Transactions {
// Short circuit if no transactions are available
Expand All @@ -209,7 +224,9 @@ func (m *txSortedMap) Ready(start uint64) types.Transactions {
delete(m.items, next)
heap.Pop(m.index)
}
m.cacheMu.Lock()
m.cache = nil
m.cacheMu.Unlock()

return ready
}
Expand All @@ -220,6 +237,8 @@ func (m *txSortedMap) Len() int {
}

func (m *txSortedMap) flatten() types.Transactions {
m.cacheMu.Lock()
defer m.cacheMu.Unlock()
// If the sorting was not cached yet, create and cache it
if m.cache == nil {
m.cache = make(types.Transactions, 0, len(m.items))
Expand Down Expand Up @@ -604,6 +623,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool

// Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool.
// If noPending is set to true, we will only consider the floating list
//
// Note local transaction won't be considered for eviction.
func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
Expand Down
6 changes: 3 additions & 3 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,11 @@ func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common
pool.mu.Lock()
defer pool.mu.Unlock()

pending := make(map[common.Address]types.Transactions)
pending := make(map[common.Address]types.Transactions, len(pool.pending))
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
queued := make(map[common.Address]types.Transactions)
queued := make(map[common.Address]types.Transactions, len(pool.queue))
for addr, list := range pool.queue {
queued[addr] = list.Flatten()
}
Expand Down Expand Up @@ -1677,7 +1677,7 @@ type accountSet struct {
// derivations.
func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
as := &accountSet{
accounts: make(map[common.Address]struct{}),
accounts: make(map[common.Address]struct{}, len(addrs)),
signer: signer,
}
for _, addr := range addrs {
Expand Down
18 changes: 18 additions & 0 deletions core/types/hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package types

import (
"bytes"
"fmt"
"math"
"sync"

"golang.org/x/crypto/sha3"
Expand All @@ -37,6 +39,22 @@ var encodeBufferPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}

// getPooledBuffer retrieves a buffer from the pool and creates a byte slice of the
// requested size from it.
//
// The caller should return the *bytes.Buffer object back into encodeBufferPool after use!
// The returned byte slice must not be used after returning the buffer.
func getPooledBuffer(size uint64) ([]byte, *bytes.Buffer, error) {
if size > math.MaxInt {
return nil, nil, fmt.Errorf("can't get buffer of size %d", size)
}
buf := encodeBufferPool.Get().(*bytes.Buffer)
buf.Reset()
buf.Grow(int(size))
b := buf.Bytes()[:int(size)]
return b, buf, nil
}

// rlpHash encodes x and hashes the encoded bytes.
func rlpHash(x interface{}) (h common.Hash) {
sha := hasherPool.Get().(crypto.KeccakState)
Expand Down
21 changes: 12 additions & 9 deletions core/types/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ var (
receiptStatusSuccessfulRLP = []byte{0x01}
)

// This error is returned when a typed receipt is decoded, but the string is empty.
var errEmptyTypedReceipt = errors.New("empty typed receipt bytes")
var errShortTypedReceipt = errors.New("typed receipt too short")

const (
// ReceiptStatusFailed is the status code of a transaction if execution failed.
Expand Down Expand Up @@ -191,7 +190,7 @@ func (r *Receipt) MarshalBinary() ([]byte, error) {
// DecodeRLP implements rlp.Decoder, and loads the consensus fields of a receipt
// from an RLP stream.
func (r *Receipt) DecodeRLP(s *rlp.Stream) error {
kind, _, err := s.Kind()
kind, size, err := s.Kind()
switch {
case err != nil:
return err
Expand All @@ -203,15 +202,19 @@ func (r *Receipt) DecodeRLP(s *rlp.Stream) error {
}
r.Type = LegacyTxType
return r.setFromRLP(dec)
case kind == rlp.String:
case kind == rlp.Byte:
return errShortTypedReceipt
default:
// It's an EIP-2718 typed tx receipt.
b, err := s.Bytes()
b, buf, err := getPooledBuffer(size)
if err != nil {
return err
}
defer encodeBufferPool.Put(buf)
if err := s.ReadBytes(b); err != nil {
return err
}
return r.decodeTyped(b)
default:
return rlp.ErrExpectedList
}
}

Expand All @@ -234,8 +237,8 @@ func (r *Receipt) UnmarshalBinary(b []byte) error {

// decodeTyped decodes a typed receipt from the canonical format.
func (r *Receipt) decodeTyped(b []byte) error {
if len(b) == 0 {
return errEmptyTypedReceipt
if len(b) <= 1 {
return errShortTypedReceipt
}
switch b[0] {
case DynamicFeeTxType, AccessListTxType, BlobTxType, L1MessageTxType:
Expand Down
2 changes: 1 addition & 1 deletion core/types/receipt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestDecodeEmptyTypedReceipt(t *testing.T) {
input := []byte{0x80}
var r Receipt
err := rlp.DecodeBytes(input, &r)
if err != errEmptyTypedReceipt {
if err != errShortTypedReceipt {
t.Fatal("wrong error:", err)
}
}
Expand Down
Loading

0 comments on commit d964f78

Please sign in to comment.