Skip to content

Commit

Permalink
Cherry-pick fix for eth/65 (celo/66) bug which can cause the tx pool …
Browse files Browse the repository at this point in the history
…to be blocked (ethereum#1484)

* (Cherry-pick from upstream) Merge pull request ethereum#21032 from karalabe/skip-announce-goroutine-eth64

eth: skip transaction announcer goroutine on eth<65

* Update eth65 -> celo66 in cherry-picked code

* (Cherry-pick form upstream) eth: don't block if transaction broadcast loop fails (ethereum#21255)

* eth: don't block if transaction broadcast loop is returned

* eth: kick out peer if we failed to send message

* eth: address comment

Conflicts:
	eth/peer.go

Co-authored-by: Péter Szilágyi <peterke@gmail.com>
Co-authored-by: gary rong <garyrong0905@gmail.com>
  • Loading branch information
3 people authored Apr 6, 2021
1 parent bb25509 commit 2757074
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
2 changes: 1 addition & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
}

// Register the peer locally
if err := pm.peers.Register(p); err != nil {
if err := pm.peers.Register(p, pm.removePeer); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
Expand Down
21 changes: 13 additions & 8 deletions eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,19 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(ha
// broadcastBlocks is a write loop that multiplexes blocks and block accouncements
// to the remote peer. The goal is to have an async writer that does not lock up
// node internals and at the same time rate limits queued data.
func (p *peer) broadcastBlocks() {
func (p *peer) broadcastBlocks(removePeer func(string)) {
for {
select {
case prop := <-p.queuedBlocks:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
removePeer(p.id)
return
}
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)

case block := <-p.queuedBlockAnns:
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
removePeer(p.id)
return
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
Expand All @@ -154,7 +156,7 @@ func (p *peer) broadcastBlocks() {
// broadcastTransactions is a write loop that schedules transaction broadcasts
// to the remote peer. The goal is to have an async writer that does not lock up
// node internals and at the same time rate limits queued data.
func (p *peer) broadcastTransactions() {
func (p *peer) broadcastTransactions(removePeer func(string)) {
var (
queue []common.Hash // Queue of hashes to broadcast as full transactions
done chan struct{} // Non-nil if background broadcaster is running
Expand Down Expand Up @@ -205,6 +207,7 @@ func (p *peer) broadcastTransactions() {
done = nil

case <-fail:
removePeer(p.id)
return

case <-p.term:
Expand All @@ -216,7 +219,7 @@ func (p *peer) broadcastTransactions() {
// announceTransactions is a write loop that schedules transaction broadcasts
// to the remote peer. The goal is to have an async writer that does not lock up
// node internals and at the same time rate limits queued data.
func (p *peer) announceTransactions() {
func (p *peer) announceTransactions(removePeer func(string)) {
var (
queue []common.Hash // Queue of hashes to announce as transaction stubs
done chan struct{} // Non-nil if background announcer is running
Expand Down Expand Up @@ -267,6 +270,7 @@ func (p *peer) announceTransactions() {
done = nil

case <-fail:
removePeer(p.id)
return

case <-p.term:
Expand Down Expand Up @@ -722,7 +726,7 @@ func newPeerSet() *peerSet {
// Register injects a new peer into the working set, or returns an error if the
// peer is already known. If a new peer it registered, its broadcast loop is also
// started.
func (ps *peerSet) Register(p *peer) error {
func (ps *peerSet) Register(p *peer, removePeer func(string)) error {
ps.lock.Lock()
defer ps.lock.Unlock()

Expand All @@ -734,10 +738,11 @@ func (ps *peerSet) Register(p *peer) error {
}
ps.peers[p.id] = p

go p.broadcastBlocks()
go p.broadcastTransactions()
go p.announceTransactions()

go p.broadcastBlocks(removePeer)
go p.broadcastTransactions(removePeer)
if p.version >= istanbul.Celo66 {
go p.announceTransactions(removePeer)
}
return nil
}

Expand Down

0 comments on commit 2757074

Please sign in to comment.