Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade rlp, transactions process optimization #113

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/devp2p/internal/ethtest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,13 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error {
return fmt.Errorf("wrong block hash in announcement: expected %v, got %v", blockAnnouncement.Block.Hash(), hashes[0].Hash)
}
return nil

// ignore tx announcements from previous tests
case *NewPooledTransactionHashes:
// ignore tx announcements from previous tests
continue
case *Transactions:
continue

default:
return fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,9 +778,13 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) {
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg))
}
return

// ignore propagated txs from previous tests
case *NewPooledTransactionHashes:
continue
case *Transactions:
continue

// ignore block announcements from previous tests
case *NewBlockHashes:
continue
Expand Down
10 changes: 6 additions & 4 deletions cmd/devp2p/internal/ethtest/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/scroll-tech/go-ethereum/params"
)

//var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
// var faucetAddr = common.HexToAddress("0x71562b71999873DB5b286dF957af199Ec94617F7")
var faucetKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")

func (s *Suite) sendSuccessfulTxs(t *utesting.T, isEth66 bool) error {
Expand Down Expand Up @@ -200,10 +200,12 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction
}
// update nonce
nonce = txs[len(txs)-1].Nonce()
// Wait for the transaction announcement(s) and make sure all sent txs are being propagated

// Wait for the transaction announcement(s) and make sure all sent txs are being propagated.
// all txs should be announced within a couple announcements.
recvHashes := make([]common.Hash, 0)
// all txs should be announced within 3 announcements
for i := 0; i < 3; i++ {

for i := 0; i < 20; i++ {
switch msg := recvConn.readAndServe(s.chain, timeout).(type) {
case *Transactions:
for _, tx := range *msg {
Expand Down
34 changes: 27 additions & 7 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"container/heap"
"math"
"math/big"
"slices"

Check failure on line 23 in core/tx_list.go

View workflow job for this annotation

GitHub Actions / build-mock-ccc-geth

package slices is not in GOROOT (/opt/hostedtoolcache/go/1.20.14/x64/src/slices)
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -48,16 +49,18 @@
old := *h
n := len(old)
x := old[n-1]
old[n-1] = 0
*h = old[0 : n-1]
return x
}

// txSortedMap is a nonce->transaction hash map with a heap based index to allow
// iterating over the contents in a nonce-incrementing way.
type txSortedMap struct {
items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted
items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted
cacheMu sync.Mutex // Mutex covering the cache
}

// newTxSortedMap creates a new nonce-sorted transaction map.
Expand All @@ -80,7 +83,9 @@
if m.items[nonce] == nil {
heap.Push(m.index, nonce)
}
m.cacheMu.Lock()
m.items[nonce], m.cache = tx, nil
m.cacheMu.Unlock()
}

// Forward removes all transactions from the map with a nonce lower than the
Expand All @@ -96,9 +101,11 @@
delete(m.items, nonce)
}
// If we had a cached order, shift the front
m.cacheMu.Lock()
if m.cache != nil {
m.cache = m.cache[len(removed):]
}
m.cacheMu.Unlock()
return removed
}

Expand All @@ -122,7 +129,9 @@
*m.index = append(*m.index, nonce)
}
heap.Init(m.index)
m.cacheMu.Lock()
m.cache = nil
m.cacheMu.Unlock()
}

// filter is identical to Filter, but **does not** regenerate the heap. This method
Expand All @@ -138,7 +147,9 @@
}
}
if len(removed) > 0 {
m.cacheMu.Lock()
m.cache = nil
m.cacheMu.Unlock()
}
return removed
}
Expand All @@ -152,19 +163,21 @@
}
// Otherwise gather and drop the highest nonce'd transactions
var drops types.Transactions

sort.Sort(*m.index)
slices.Sort(*m.index)
for size := len(m.items); size > threshold; size-- {
drops = append(drops, m.items[(*m.index)[size-1]])
delete(m.items, (*m.index)[size-1])
}
*m.index = (*m.index)[:threshold]
heap.Init(m.index)
// The sorted m.index slice is still a valid heap, so there is no need to
// reheap after deleting tail items.

// If we had a cache, shift the back
m.cacheMu.Lock()
if m.cache != nil {
m.cache = m.cache[:len(m.cache)-len(drops)]
}
m.cacheMu.Unlock()
return drops
}

Expand All @@ -184,7 +197,9 @@
}
}
delete(m.items, nonce)
m.cacheMu.Lock()
m.cache = nil
m.cacheMu.Unlock()

return true
}
Expand All @@ -194,7 +209,7 @@
// removed from the list.
//
// Note, all transactions with nonces lower than start will also be returned to
// prevent getting into and invalid state. This is not something that should ever
// prevent getting into an invalid state. This is not something that should ever
// happen but better to be self correcting than failing!
func (m *txSortedMap) Ready(start uint64) types.Transactions {
// Short circuit if no transactions are available
Expand All @@ -208,7 +223,9 @@
delete(m.items, next)
heap.Pop(m.index)
}
m.cacheMu.Lock()
m.cache = nil
m.cacheMu.Unlock()

return ready
}
Expand All @@ -219,6 +236,8 @@
}

func (m *txSortedMap) flatten() types.Transactions {
m.cacheMu.Lock()
defer m.cacheMu.Unlock()
// If the sorting was not cached yet, create and cache it
if m.cache == nil {
m.cache = make(types.Transactions, 0, len(m.items))
Expand Down Expand Up @@ -603,6 +622,7 @@

// Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool.
// If noPending is set to true, we will only consider the floating list
//
// Note local transaction won't be considered for eviction.
func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
Expand Down
6 changes: 3 additions & 3 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,11 @@ func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common
pool.mu.Lock()
defer pool.mu.Unlock()

pending := make(map[common.Address]types.Transactions)
pending := make(map[common.Address]types.Transactions, len(pool.pending))
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
queued := make(map[common.Address]types.Transactions)
queued := make(map[common.Address]types.Transactions, len(pool.queue))
for addr, list := range pool.queue {
queued[addr] = list.Flatten()
}
Expand Down Expand Up @@ -1660,7 +1660,7 @@ type accountSet struct {
// derivations.
func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
as := &accountSet{
accounts: make(map[common.Address]struct{}),
accounts: make(map[common.Address]struct{}, len(addrs)),
signer: signer,
}
for _, addr := range addrs {
Expand Down
18 changes: 18 additions & 0 deletions core/types/hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package types

import (
"bytes"
"fmt"
"math"
"sync"

"golang.org/x/crypto/sha3"
Expand All @@ -37,6 +39,22 @@ var encodeBufferPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}

// getPooledBuffer retrieves a buffer from the pool and creates a byte slice of the
// requested size from it.
//
// The caller should return the *bytes.Buffer object back into encodeBufferPool after use!
// The returned byte slice must not be used after returning the buffer.
func getPooledBuffer(size uint64) ([]byte, *bytes.Buffer, error) {
if size > math.MaxInt {
return nil, nil, fmt.Errorf("can't get buffer of size %d", size)
}
buf := encodeBufferPool.Get().(*bytes.Buffer)
buf.Reset()
buf.Grow(int(size))
b := buf.Bytes()[:int(size)]
return b, buf, nil
}

// rlpHash encodes x and hashes the encoded bytes.
func rlpHash(x interface{}) (h common.Hash) {
sha := hasherPool.Get().(crypto.KeccakState)
Expand Down
21 changes: 12 additions & 9 deletions core/types/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ var (
receiptStatusSuccessfulRLP = []byte{0x01}
)

// This error is returned when a typed receipt is decoded, but the string is empty.
var errEmptyTypedReceipt = errors.New("empty typed receipt bytes")
var errShortTypedReceipt = errors.New("typed receipt too short")

const (
// ReceiptStatusFailed is the status code of a transaction if execution failed.
Expand Down Expand Up @@ -189,7 +188,7 @@ func (r *Receipt) MarshalBinary() ([]byte, error) {
// DecodeRLP implements rlp.Decoder, and loads the consensus fields of a receipt
// from an RLP stream.
func (r *Receipt) DecodeRLP(s *rlp.Stream) error {
kind, _, err := s.Kind()
kind, size, err := s.Kind()
switch {
case err != nil:
return err
Expand All @@ -201,15 +200,19 @@ func (r *Receipt) DecodeRLP(s *rlp.Stream) error {
}
r.Type = LegacyTxType
return r.setFromRLP(dec)
case kind == rlp.String:
case kind == rlp.Byte:
return errShortTypedReceipt
default:
// It's an EIP-2718 typed tx receipt.
b, err := s.Bytes()
b, buf, err := getPooledBuffer(size)
if err != nil {
return err
}
defer encodeBufferPool.Put(buf)
if err := s.ReadBytes(b); err != nil {
return err
}
return r.decodeTyped(b)
default:
return rlp.ErrExpectedList
}
}

Expand All @@ -232,8 +235,8 @@ func (r *Receipt) UnmarshalBinary(b []byte) error {

// decodeTyped decodes a typed receipt from the canonical format.
func (r *Receipt) decodeTyped(b []byte) error {
if len(b) == 0 {
return errEmptyTypedReceipt
if len(b) <= 1 {
return errShortTypedReceipt
}
switch b[0] {
case DynamicFeeTxType, AccessListTxType, BlobTxType, L1MessageTxType:
Expand Down
2 changes: 1 addition & 1 deletion core/types/receipt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestDecodeEmptyTypedReceipt(t *testing.T) {
input := []byte{0x80}
var r Receipt
err := rlp.DecodeBytes(input, &r)
if err != errEmptyTypedReceipt {
if err != errShortTypedReceipt {
t.Fatal("wrong error:", err)
}
}
Expand Down
Loading
Loading