Skip to content

Commit

Permalink
eth/protocols/peer: check cardinality of known block and tx hashes
Browse files Browse the repository at this point in the history
  • Loading branch information
andrepatta committed Nov 4, 2024
1 parent 5a601dd commit 1a9c843
Showing 1 changed file with 53 additions and 12 deletions.
65 changes: 53 additions & 12 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ type Peer struct {
head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty

knownBlocks *knownCache // Set of block hashes known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer

txpool TxPool // Transaction pool used by the broadcasters for liveness checks
knownTxs *knownCache // Set of transaction hashes known to be known by this peer
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests
txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests

Expand All @@ -102,8 +102,8 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
Peer: p,
rw: rw,
version: version,
knownTxs: newKnownCache(maxKnownTxs),
knownBlocks: newKnownCache(maxKnownBlocks),
knownTxs: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks),
queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
txBroadcast: make(chan []common.Hash),
Expand Down Expand Up @@ -172,13 +172,19 @@ func (p *Peer) KnownTransaction(hash common.Hash) bool {
// never be propagated to this particular peer.
func (p *Peer) markBlock(hash common.Hash) {
// If we reached the memory allowance, drop a previously known block hash
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(hash)
}

// markTransaction marks a transaction as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *Peer) markTransaction(hash common.Hash) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownTxs.Cardinality() >= maxKnownTxs {
p.knownTxs.Pop()
}
p.knownTxs.Add(hash)
}

Expand All @@ -193,6 +199,9 @@ func (p *Peer) markTransaction(hash common.Hash) {
// tests that directly send messages without having to do the asyn queueing.
func (p *Peer) SendTransactions(txs types.Transactions) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(txs)) {
p.knownTxs.Pop()
}
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
Expand All @@ -206,7 +215,12 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
select {
case p.txBroadcast <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
case <-p.term:
p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
}
Expand All @@ -220,7 +234,12 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
// not be managed directly.
func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
}

Expand All @@ -231,7 +250,12 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
select {
case p.txAnnounce <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
case <-p.term:
p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
}
Expand All @@ -240,10 +264,14 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
p.knownTxs.Add(hashes...)

for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
// Not packed into PooledTransactionsPacket to avoid RLP decoding
return p2p.Send(p.rw, PooledTransactionsMsg, &PooledTransactionsRLPPacket66{
return p2p.Send(p.rw, PooledTransactionsMsg, PooledTransactionsRLPPacket66{
RequestId: id,
PooledTransactionsRLPPacket: txs,
})
Expand All @@ -253,8 +281,12 @@ func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs [
// a hash notification.
func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
// Mark all the block hashes as known, but ensure we don't overflow our limits
p.knownBlocks.Add(hashes...)

for p.knownBlocks.Cardinality() > max(0, maxKnownBlocks-len(hashes)) {
p.knownBlocks.Pop()
}
for _, hash := range hashes {
p.knownBlocks.Add(hash)
}
request := make(NewBlockHashesPacket, len(hashes))
for i := 0; i < len(hashes); i++ {
request[i].Hash = hashes[i]
Expand All @@ -270,6 +302,9 @@ func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
select {
case p.queuedBlockAnns <- block:
// Mark all the block hash as known, but ensure we don't overflow our limits
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
Expand All @@ -279,6 +314,9 @@ func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
// SendNewBlock propagates an entire block to a remote peer.
func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error {
// Mark all the block hash as known, but ensure we don't overflow our limits
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(block.Hash())
return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{
Block: block,
Expand All @@ -292,6 +330,9 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
select {
case p.queuedBlocks <- &blockPropagation{block: block, td: td}:
// Mark all the block hash as known, but ensure we don't overflow our limits
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
Expand Down

0 comments on commit 1a9c843

Please sign in to comment.