Skip to content

Commit

Permalink
eth/downloader: add support for requests in downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
lightclient committed Aug 15, 2024
1 parent ad76f3f commit a2df8b2
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 15 deletions.
9 changes: 5 additions & 4 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ func (h *Header) SanityCheck() error {
// EmptyBody returns true if there is no additional 'body' to complete the header
// that is: no transactions, no uncles and no withdrawals.
func (h *Header) EmptyBody() bool {
if h.WithdrawalsHash != nil {
return h.TxHash == EmptyTxsHash && *h.WithdrawalsHash == EmptyWithdrawalsHash
}
return h.TxHash == EmptyTxsHash && h.UncleHash == EmptyUncleHash
var (
emptyWithdrawals = h.WithdrawalsHash == nil || *h.WithdrawalsHash == EmptyWithdrawalsHash
emptyRequests = h.RequestsHash == nil || *h.RequestsHash == EmptyReceiptsHash
)
return h.TxHash == EmptyTxsHash && h.UncleHash == EmptyUncleHash && emptyWithdrawals && emptyRequests
}

// EmptyReceipts returns true if there are no receipts for this header/block.
Expand Down
3 changes: 2 additions & 1 deletion eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
txsHashes = make([]common.Hash, len(bodies))
uncleHashes = make([]common.Hash, len(bodies))
withdrawalHashes = make([]common.Hash, len(bodies))
requestsHashes = make([]common.Hash, len(bodies))
)
hasher := trie.NewStackTrie(nil)
for i, body := range bodies {
Expand All @@ -248,7 +249,7 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
res := &eth.Response{
Req: req,
Res: (*eth.BlockBodiesResponse)(&bodies),
Meta: [][]common.Hash{txsHashes, uncleHashes, withdrawalHashes},
Meta: [][]common.Hash{txsHashes, uncleHashes, withdrawalHashes, requestsHashes},
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
}
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/fetchers_concurrent_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ func (q *bodyQueue) request(peer *peerConnection, req *fetchRequest, resCh chan
// deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the body data and delivering it to the downloader's queue.
func (q *bodyQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
txs, uncles, withdrawals := packet.Res.(*eth.BlockBodiesResponse).Unpack()
hashsets := packet.Meta.([][]common.Hash) // {txs hashes, uncle hashes, withdrawal hashes}
txs, uncles, withdrawals, requests := packet.Res.(*eth.BlockBodiesResponse).Unpack()
hashsets := packet.Meta.([][]common.Hash) // {txs hashes, uncle hashes, withdrawal hashes, requests hashes}

accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1], withdrawals, hashsets[2])
accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1], withdrawals, hashsets[2], requests, hashsets[3])
switch {
case err == nil && len(txs) == 0:
peer.log.Trace("Requested bodies delivered")
Expand Down
16 changes: 15 additions & 1 deletion eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,8 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, hashes []comm
// also wakes any threads waiting for data delivery.
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListHashes []common.Hash,
uncleLists [][]*types.Header, uncleListHashes []common.Hash,
withdrawalLists [][]*types.Withdrawal, withdrawalListHashes []common.Hash) (int, error) {
withdrawalLists [][]*types.Withdrawal, withdrawalListHashes []common.Hash,
requestsLists [][]*types.Request, requestsListHashes []common.Hash) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()

Expand All @@ -807,6 +808,19 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
return errInvalidBody
}
}
if header.RequestsHash == nil {
// nil hash means that requests should not be present in body
if requestsLists[index] != nil {
return errInvalidBody
}
} else { // non-nil hash: body must have requests
if requestsLists[index] == nil {
return errInvalidBody
}
if requestsListHashes[index] != *header.RequestsHash {
return errInvalidBody
}
}
// Blocks must have a number of blobs corresponding to the header gas usage,
// and zero before the Cancun hardfork.
var blobs int
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func XTestDelivery(t *testing.T) {
uncleHashes[i] = types.CalcUncleHash(uncles)
}
time.Sleep(100 * time.Millisecond)
_, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes, nil, nil)
_, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes, nil, nil, nil, nil)
if err != nil {
fmt.Printf("delivered %d bodies %v\n", len(txset), err)
}
Expand Down
6 changes: 5 additions & 1 deletion eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
txsHashes = make([]common.Hash, len(res.BlockBodiesResponse))
uncleHashes = make([]common.Hash, len(res.BlockBodiesResponse))
withdrawalHashes = make([]common.Hash, len(res.BlockBodiesResponse))
requestsHashes = make([]common.Hash, len(res.BlockBodiesResponse))
)
hasher := trie.NewStackTrie(nil)
for i, body := range res.BlockBodiesResponse {
Expand All @@ -321,8 +322,11 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
if body.Withdrawals != nil {
withdrawalHashes[i] = types.DeriveSha(types.Withdrawals(body.Withdrawals), hasher)
}
if body.Requests != nil {
requestsHashes[i] = types.DeriveSha(types.Requests(body.Requests), hasher)
}
}
return [][]common.Hash{txsHashes, uncleHashes, withdrawalHashes}
return [][]common.Hash{txsHashes, uncleHashes, withdrawalHashes, requestsHashes}
}
return peer.dispatchResponse(&Response{
id: res.RequestId,
Expand Down
9 changes: 5 additions & 4 deletions eth/protocols/eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,22 @@ type BlockBody struct {
Transactions []*types.Transaction // Transactions contained within a block
Uncles []*types.Header // Uncles contained within a block
Withdrawals []*types.Withdrawal `rlp:"optional"` // Withdrawals contained within a block
Requests []*types.Request `rlp:"optional"` // Requests contained within a block
}

// Unpack retrieves the transactions and uncles from the range packet and returns
// them in a split flat format that's more consistent with the internal data structures.
func (p *BlockBodiesResponse) Unpack() ([][]*types.Transaction, [][]*types.Header, [][]*types.Withdrawal) {
// TODO(matt): add support for withdrawals to fetchers
func (p *BlockBodiesResponse) Unpack() ([][]*types.Transaction, [][]*types.Header, [][]*types.Withdrawal, [][]*types.Request) {
var (
txset = make([][]*types.Transaction, len(*p))
uncleset = make([][]*types.Header, len(*p))
withdrawalset = make([][]*types.Withdrawal, len(*p))
requestset = make([][]*types.Request, len(*p))
)
for i, body := range *p {
txset[i], uncleset[i], withdrawalset[i] = body.Transactions, body.Uncles, body.Withdrawals
txset[i], uncleset[i], withdrawalset[i], requestset[i] = body.Transactions, body.Uncles, body.Withdrawals, body.Requests
}
return txset, uncleset, withdrawalset
return txset, uncleset, withdrawalset, requestset
}

// GetReceiptsRequest represents a block receipts query.
Expand Down

0 comments on commit a2df8b2

Please sign in to comment.