Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/handler, broadcast: optimize tx broadcast mechanism #22176

Merged
merged 3 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,44 +456,51 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
}
}

// BroadcastTransactions will propagate a batch of transactions to all peers which are not known to
// BroadcastTransactions will propagate a batch of transactions
// - To a square root of all peers
// - And, separately, as announcements to all peers which are not known to
// already have the given transaction.
func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool) {
func (h *handler) BroadcastTransactions(txs types.Transactions) {
var (
txset = make(map[*ethPeer][]common.Hash)
annos = make(map[*ethPeer][]common.Hash)
annoCount int // Count of announcements made
annoPeers int
directCount int // Count of the txs sent directly to peers
directPeers int // Count of the peers that were sent transactions directly

txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce

)
// Broadcast transactions to a batch of peers not knowing about it
if propagate {
for _, tx := range txs {
peers := h.peers.peersWithoutTransaction(tx.Hash())

// Send the block to a subset of our peers
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
txset[peer] = append(txset[peer], tx.Hash())
}
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(transfer))
}
for peer, hashes := range txset {
peer.AsyncSendTransactions(hashes)
}
return
}
// Otherwise only broadcast the announcement to peers
for _, tx := range txs {
peers := h.peers.peersWithoutTransaction(tx.Hash())
for _, peer := range peers {
// Send the tx unconditionally to a subset of our peers
numDirect := int(math.Sqrt(float64(len(peers))))
for _, peer := range peers[:numDirect] {
txset[peer] = append(txset[peer], tx.Hash())
}
// For the remaining peers, send announcement only
for _, peer := range peers[numDirect:] {
annos[peer] = append(annos[peer], tx.Hash())
}
}
for peer, hashes := range txset {
directPeers++
directCount += len(hashes)
peer.AsyncSendTransactions(hashes)
}
for peer, hashes := range annos {
annoPeers++
annoCount += len(hashes)
if peer.Version() >= eth.ETH65 {
peer.AsyncSendPooledTransactionHashes(hashes)
} else {
peer.AsyncSendTransactions(hashes)
}
}
log.Debug("Transaction broadcast", "txs", len(txs),
"announce packs", annoPeers, "announced hashes", annoCount,
"tx packs", directPeers, "broadcast txs", directCount)
}

// minedBroadcastLoop sends mined blocks to connected peers.
Expand All @@ -511,13 +518,10 @@ func (h *handler) minedBroadcastLoop() {
// txBroadcastLoop announces new transactions to connected peers.
func (h *handler) txBroadcastLoop() {
defer h.wg.Done()

for {
select {
case event := <-h.txsCh:
h.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers
h.BroadcastTransactions(event.Txs, false) // Only then announce to the rest

h.BroadcastTransactions(event.Txs)
case <-h.txsSub.Err():
return
}
Expand Down
8 changes: 4 additions & 4 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,18 @@ func (p *Peer) announceTransactions() {
if done == nil && len(queue) > 0 {
// Pile transaction hashes until we reach our allowed network limit
var (
hashes []common.Hash
i int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since i is no longer only used as a loop index, could you give it a more explicit name?

pending []common.Hash
size common.StorageSize
)
for i := 0; i < len(queue) && size < maxTxPacketSize; i++ {
for i = 0; i < len(queue) && size < maxTxPacketSize; i++ {
if p.txpool.Get(queue[i]) != nil {
pending = append(pending, queue[i])
size += common.HashLength
}
hashes = append(hashes, queue[i])
}
queue = queue[:copy(queue, queue[len(hashes):])]
// Shift and trim queue
queue = queue[:copy(queue, queue[i:])]

// If there's anything available to transfer, fire up an async writer
if len(pending) > 0 {
Expand Down