diff --git a/eth/handler.go b/eth/handler.go index f9854766c4..8844944b11 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -321,7 +321,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Error("Ethereum peer registration failed", "err", err) return err } - defer h.removePeer(peer.ID()) + defer h.unregisterPeer(peer.ID()) p := h.peers.peer(peer.ID()) if p == nil { @@ -418,9 +418,17 @@ func (h *handler) runTrustExtension(peer *trust.Peer, handler trust.Handler) err return handler(peer) } -// removePeer unregisters a peer from the downloader and fetchers, removes it from -// the set of tracked peers and closes the network connection to it. +// removePeer requests disconnection of a peer. func (h *handler) removePeer(id string) { + peer := h.peers.peer(id) + if peer != nil { + // Hard disconnect at the networking layer. Handler will get an EOF and terminate the peer. defer unregisterPeer will do the cleanup task after then. + peer.Peer.Disconnect(p2p.DiscUselessPeer) + } +} + +// unregisterPeer removes a peer from the downloader, fetchers and main peer set. +func (h *handler) unregisterPeer(id string) { // Create a custom logger to avoid printing the entire id var logger log.Logger if len(id) < 16 { @@ -448,8 +456,6 @@ func (h *handler) removePeer(id string) { if err := h.peers.unregisterPeer(id); err != nil { logger.Error("Ethereum peer removal failed", "err", err) } - // Hard disconnect at the networking layer - peer.Peer.Disconnect(p2p.DiscUselessPeer) } func (h *handler) Start(maxPeers int) { diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index e85c74e6f2..1aedadd225 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -641,14 +641,20 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo defer p2pLocal.Close() defer p2pRemote.Close() - local := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{1}, "", nil), p2pLocal, handler.txpool) - remote := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{2}, "", nil), p2pRemote, handler.txpool) + local := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool) + remote := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool) defer local.Close() defer remote.Close() - go handler.handler.runEthPeer(local, func(peer *eth.Peer) error { - return eth.Handle((*ethHandler)(handler.handler), peer) - }) + handlerDone := make(chan struct{}) + go func() { + defer close(handlerDone) + handler.handler.runEthPeer(local, func(peer *eth.Peer) error { + err := eth.Handle((*ethHandler)(handler.handler), peer) + return err + }) + }() + // Run the handshake locally to avoid spinning up a remote handler var ( genesis = handler.chain.Genesis() @@ -685,6 +691,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo // Verify that the remote peer is maintained or dropped if drop { + <-handlerDone if peers := handler.handler.peers.len(); peers != 0 { t.Fatalf("peer count mismatch: have %d, want %d", peers, 0) } diff --git a/p2p/peer.go b/p2p/peer.go index cdfaf7f21b..aa80db6db3 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -116,7 +116,8 @@ type Peer struct { disc chan DiscReason // events receives message send / receive events if set - events *event.Feed + events *event.Feed + testPipe *MsgPipeRW // for testing } // NewPeer returns a peer for testing purposes. @@ -129,6 +130,15 @@ func NewPeer(id enode.ID, name string, caps []Cap) *Peer { return peer } +// NewPeerPipe creates a peer for testing purposes. +// The message pipe given as the last parameter is closed when +// Disconnect is called on the peer. +func NewPeerPipe(id enode.ID, name string, caps []Cap, pipe *MsgPipeRW) *Peer { + p := NewPeer(id, name, caps) + p.testPipe = pipe + return p +} + // NewPeerWithProtocols returns a peer for testing purposes. func NewPeerWithProtocols(id enode.ID, protocols []Protocol, name string, caps []Cap) *Peer { pipe, _ := net.Pipe() @@ -196,6 +206,10 @@ func (p *Peer) LocalAddr() net.Addr { // Disconnect terminates the peer connection with the given reason. // It returns immediately and does not wait until the connection is closed. func (p *Peer) Disconnect(reason DiscReason) { + if p.testPipe != nil { + p.testPipe.Close() + } + select { case p.disc <- reason: case <-p.closed: