diff --git a/eth/handler.go b/eth/handler.go index a5a62b894dba..13fa7019357c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -456,44 +456,51 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { } } -// BroadcastTransactions will propagate a batch of transactions to all peers which are not known to +// BroadcastTransactions will propagate a batch of transactions +// - To a square root of all peers +// - And, separately, as announcements to all peers which are not known to // already have the given transaction. -func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool) { +func (h *handler) BroadcastTransactions(txs types.Transactions) { var ( - txset = make(map[*ethPeer][]common.Hash) - annos = make(map[*ethPeer][]common.Hash) + annoCount int // Count of announcements made + annoPeers int + directCount int // Count of the txs sent directly to peers + directPeers int // Count of the peers that were sent transactions directly + + txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly + annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce + ) // Broadcast transactions to a batch of peers not knowing about it - if propagate { - for _, tx := range txs { - peers := h.peers.peersWithoutTransaction(tx.Hash()) - - // Send the block to a subset of our peers - transfer := peers[:int(math.Sqrt(float64(len(peers))))] - for _, peer := range transfer { - txset[peer] = append(txset[peer], tx.Hash()) - } - log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(transfer)) - } - for peer, hashes := range txset { - peer.AsyncSendTransactions(hashes) - } - return - } - // Otherwise only broadcast the announcement to peers for _, tx := range txs { peers := h.peers.peersWithoutTransaction(tx.Hash()) - for _, peer := range peers { + // Send the tx unconditionally to a subset of our peers + numDirect := int(math.Sqrt(float64(len(peers)))) + for _, peer := range peers[:numDirect] { + txset[peer] = append(txset[peer], tx.Hash()) + } + // For the remaining peers, send announcement only + for _, peer := range peers[numDirect:] { annos[peer] = append(annos[peer], tx.Hash()) } } + for peer, hashes := range txset { + directPeers++ + directCount += len(hashes) + peer.AsyncSendTransactions(hashes) + } for peer, hashes := range annos { + annoPeers++ + annoCount += len(hashes) if peer.Version() >= eth.ETH65 { peer.AsyncSendPooledTransactionHashes(hashes) } else { peer.AsyncSendTransactions(hashes) } } + log.Debug("Transaction broadcast", "txs", len(txs), + "announce packs", annoPeers, "announced hashes", annoCount, + "tx packs", directPeers, "broadcast txs", directCount) } // minedBroadcastLoop sends mined blocks to connected peers. @@ -511,13 +518,10 @@ func (h *handler) minedBroadcastLoop() { // txBroadcastLoop announces new transactions to connected peers. func (h *handler) txBroadcastLoop() { defer h.wg.Done() - for { select { case event := <-h.txsCh: - h.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers - h.BroadcastTransactions(event.Txs, false) // Only then announce to the rest - + h.BroadcastTransactions(event.Txs) case <-h.txsSub.Err(): return } diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 74ec2f06543e..328396d510f2 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -142,18 +142,18 @@ func (p *Peer) announceTransactions() { if done == nil && len(queue) > 0 { // Pile transaction hashes until we reach our allowed network limit var ( - hashes []common.Hash + count int pending []common.Hash size common.StorageSize ) - for i := 0; i < len(queue) && size < maxTxPacketSize; i++ { - if p.txpool.Get(queue[i]) != nil { - pending = append(pending, queue[i]) + for count = 0; count < len(queue) && size < maxTxPacketSize; count++ { + if p.txpool.Get(queue[count]) != nil { + pending = append(pending, queue[count]) size += common.HashLength } - hashes = append(hashes, queue[i]) } - queue = queue[:copy(queue, queue[len(hashes):])] + // Shift and trim queue + queue = queue[:copy(queue, queue[count:])] // If there's anything available to transfer, fire up an async writer if len(pending) > 0 {