Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: refactor MaxPendingPeers handling #3981

Merged
merged 1 commit into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ var (
}
MaxPendingPeersFlag = cli.IntFlag{
Name: "maxpendpeers",
Usage: "Maximum number of pending connection attempts (defaults used if set to 0)",
Usage: "Maximum number of TCP connections pending to become connected peers",
Value: node.DefaultConfig.P2P.MaxPendingPeers,
}
ListenPortFlag = cli.IntFlag{
Expand Down
9 changes: 5 additions & 4 deletions node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ var DefaultConfig = Config{
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
P2P: p2p.Config{
ListenAddr: ":30303",
ListenAddr65: ":30304",
MaxPeers: 100,
NAT: nat.Any(),
ListenAddr: ":30303",
ListenAddr65: ":30304",
MaxPeers: 100,
MaxPendingPeers: 50,
NAT: nat.Any(),
},
}
3 changes: 0 additions & 3 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ type dialConfig struct {
}

func (cfg dialConfig) withDefaults() dialConfig {
if cfg.maxActiveDials == 0 {
cfg.maxActiveDials = defaultMaxPendingPeers
}
if cfg.log == nil {
cfg.log = log.Root()
}
Expand Down
82 changes: 46 additions & 36 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"golang.org/x/sync/semaphore"
"net"
"sort"
"sync"
Expand Down Expand Up @@ -52,8 +53,7 @@ const (
discmixTimeout = 5 * time.Second

// Connectivity defaults.
defaultMaxPendingPeers = 50
defaultDialRatio = 3
defaultDialRatio = 3

// This time limits inbound connection attempts per source IP.
inboundThrottleTime = 30 * time.Second
Expand All @@ -79,7 +79,7 @@ type Config struct {

// MaxPendingPeers is the maximum number of peers that can be pending in the
// handshake phase, counted separately for inbound and outbound connections.
// Zero defaults to preset values.
// It must be greater than zero.
MaxPendingPeers int `toml:",omitempty"`

// DialRatio controls the ratio of inbound to dialed connections.
Expand Down Expand Up @@ -191,7 +191,9 @@ type Server struct {
dialsched *dialScheduler

// Channels into the run loop.
quit chan struct{}
quitCtx context.Context
quitFunc context.CancelFunc
quit <-chan struct{}
addtrusted chan *enode.Node
removetrusted chan *enode.Node
peerOp chan peerOpFunc
Expand Down Expand Up @@ -409,10 +411,10 @@ func (srv *Server) Stop() {
return
}
srv.running = false
close(srv.quit)
srv.quitFunc()
if srv.listener != nil {
// this unblocks listener Accept
srv.listener.Close()
_ = srv.listener.Close()
}
if srv.nodedb != nil {
srv.nodedb.Close()
Expand Down Expand Up @@ -476,13 +478,17 @@ func (srv *Server) Start(ctx context.Context) error {
if srv.PrivateKey == nil {
return errors.New("Server.PrivateKey must be set to a non-nil key")
}
if srv.MaxPendingPeers <= 0 {
return errors.New("MaxPendingPeers must be greater than zero")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.listenFunc == nil {
srv.listenFunc = net.Listen
}
srv.quit = make(chan struct{})
srv.quitCtx, srv.quitFunc = context.WithCancel(ctx)
srv.quit = srv.quitCtx.Done()
srv.delpeer = make(chan peerDrop)
srv.checkpointPostHandshake = make(chan *conn)
srv.checkpointAddPeer = make(chan *conn)
Expand All @@ -495,11 +501,11 @@ func (srv *Server) Start(ctx context.Context) error {
return err
}
if srv.ListenAddr != "" {
if err := srv.setupListening(); err != nil {
if err := srv.setupListening(srv.quitCtx); err != nil {
return err
}
}
if err := srv.setupDiscovery(ctx); err != nil {
if err := srv.setupDiscovery(srv.quitCtx); err != nil {
return err
}
srv.setupDialScheduler()
Expand Down Expand Up @@ -586,8 +592,8 @@ func (srv *Server) setupDiscovery(ctx context.Context) error {
srv.loopWG.Add(1)
go func() {
defer debug.LogPanic()
defer srv.loopWG.Done()
nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
srv.loopWG.Done()
}()
}
}
Expand Down Expand Up @@ -682,7 +688,7 @@ func (srv *Server) maxDialedConns() (limit int) {
return limit
}

func (srv *Server) setupListening() error {
func (srv *Server) setupListening(ctx context.Context) error {
// Launch the listener.
listener, err := srv.listenFunc("tcp", srv.ListenAddr)
if err != nil {
Expand All @@ -698,14 +704,18 @@ func (srv *Server) setupListening() error {
srv.loopWG.Add(1)
go func() {
defer debug.LogPanic()
defer srv.loopWG.Done()
nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
srv.loopWG.Done()
}()
}
}

srv.loopWG.Add(1)
go srv.listenLoop()
go func() {
defer debug.LogPanic()
defer srv.loopWG.Done()
srv.listenLoop(ctx)
}()
return nil
}

Expand Down Expand Up @@ -857,32 +867,26 @@ func (srv *Server) addPeerChecks(peers map[enode.ID]*Peer, inboundCount int, c *

// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
defer debug.LogPanic()
func (srv *Server) listenLoop(ctx context.Context) {
srv.log.Trace("TCP listener up", "addr", srv.listener.Addr())

// The slots channel limits accepts of new connections.
tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
// The slots limit accepts of new connections.
slots := semaphore.NewWeighted(int64(srv.MaxPendingPeers))

// Wait for slots to be returned on exit. This ensures all connection goroutines
// are down before listenLoop returns.
defer srv.loopWG.Done()
defer func() {
for i := 0; i < cap(slots); i++ {
<-slots
}
_ = slots.Acquire(ctx, int64(srv.MaxPendingPeers))
}()

for {
// Wait for a free slot before accepting.
<-slots
if slotErr := slots.Acquire(ctx, 1); slotErr != nil {
if !errors.Is(slotErr, context.Canceled) {
srv.log.Error("Failed to get a peer connection slot", "err", slotErr)
}
return
}

var (
fd net.Conn
Expand All @@ -899,18 +903,23 @@ func (srv *Server) listenLoop() {
time.Sleep(time.Millisecond * 200)
continue
} else if err != nil {
srv.log.Trace("Read error", "err", err)
slots <- struct{}{}
// Log the error unless the server is shutting down.
select {
case <-srv.quit:
default:
srv.log.Error("Server listener failed to accept a connection", "err", err)
}
slots.Release(1)
return
}
break
}

remoteIP := netutil.AddrIP(fd.RemoteAddr())
if err := srv.checkInboundConn(fd, remoteIP); err != nil {
srv.log.Trace("Rejected inbound connnection", "addr", fd.RemoteAddr(), "err", err)
fd.Close()
slots <- struct{}{}
srv.log.Trace("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
_ = fd.Close()
slots.Release(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better do release in defer, otherwise can get deadlock on panic

Copy link
Contributor Author

@battlmonstr battlmonstr Apr 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to do this without a bigger refactoring of the structure of this function.

defer only runs on return, and can't be cancelled once it is set.
Here the logic wants to release either before goroutine if checks fail, or after the setup goroutine ends.

One way to do would be like so (pseudocode):

slots.Acquire()
preSetupBarrier = WaitGroup(1)
go {
    defer slots.Release()
    conn, err = preSetup(preSetupBarrier)
    if err != nil { return }
    setup(conn)
}
preSetupBarrier.Wait()

//

func preSetup(preSetupBarrier) (conn, err) {
    defer preSetupBarrier.Done()
    conn, err = accept()
    if err != nil { return nil, err }
    err = checks(conn)
    return conn, err
}

it is definitely neat and future-proof with a single Release, but it adds a barrier, and a new func such that .Done() call is also performed by defer to avoid it in multiple return paths.
besides log.Trace and fd.Close are primitives and are unlikely to panic.

Do you think it is worth this refactoring?

Also I wonder if we'd like to keep the ability to apply patches from geth upstream?

Copy link
Contributor Author

@battlmonstr battlmonstr Apr 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also considering using a bounded channel, but I feel that there has to be 2 channels for this. It wants to allow for parallel setup while queueing up Accept'ed conns.

continue
}
if remoteIP != nil {
Expand All @@ -923,8 +932,9 @@ func (srv *Server) listenLoop() {
}
go func() {
defer debug.LogPanic()
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
defer slots.Release(1)
// The error is logged in Server.setupConn().
_ = srv.SetupConn(fd, inboundConn, nil)
}()
}
}
Expand Down
89 changes: 48 additions & 41 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func (c *testTransport) close(err error) {

func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *Server {
config := Config{
Name: "test",
MaxPeers: 10,
ListenAddr: "127.0.0.1:0",
NoDiscovery: true,
PrivateKey: newkey(),
Log: testlog.Logger(t, log.LvlError),
Name: "test",
MaxPeers: 10,
MaxPendingPeers: 10,
ListenAddr: "127.0.0.1:0",
NoDiscovery: true,
PrivateKey: newkey(),
Log: testlog.Logger(t, log.LvlError),
}
server := &Server{
Config: config,
Expand Down Expand Up @@ -211,18 +212,20 @@ func TestServerDial(t *testing.T) {
// This test checks that RemovePeer disconnects the peer if it is connected.
func TestServerRemovePeerDisconnect(t *testing.T) {
srv1 := &Server{Config: Config{
PrivateKey: newkey(),
MaxPeers: 1,
NoDiscovery: true,
Log: testlog.Logger(t, log.LvlTrace).New("server", "1"),
PrivateKey: newkey(),
MaxPeers: 1,
MaxPendingPeers: 1,
NoDiscovery: true,
Log: testlog.Logger(t, log.LvlTrace).New("server", "1"),
}}
srv2 := &Server{Config: Config{
PrivateKey: newkey(),
MaxPeers: 1,
NoDiscovery: true,
NoDial: true,
ListenAddr: "127.0.0.1:0",
Log: testlog.Logger(t, log.LvlTrace).New("server", "2"),
PrivateKey: newkey(),
MaxPeers: 1,
MaxPendingPeers: 1,
NoDiscovery: true,
NoDial: true,
ListenAddr: "127.0.0.1:0",
Log: testlog.Logger(t, log.LvlTrace).New("server", "2"),
}}
if err := srv1.TestStart(); err != nil {
t.Fatal("cant start srv1")
Expand All @@ -249,12 +252,13 @@ func TestServerAtCap(t *testing.T) {
trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
srv := &Server{
Config: Config{
PrivateKey: newkey(),
MaxPeers: 10,
NoDial: true,
NoDiscovery: true,
TrustedNodes: []*enode.Node{newNode(trustedID, "")},
Log: testlog.Logger(t, log.LvlTrace),
PrivateKey: newkey(),
MaxPeers: 10,
MaxPendingPeers: 10,
NoDial: true,
NoDiscovery: true,
TrustedNodes: []*enode.Node{newNode(trustedID, "")},
Log: testlog.Logger(t, log.LvlTrace),
},
}
if err := srv.TestStart(); err != nil {
Expand Down Expand Up @@ -325,12 +329,13 @@ func TestServerPeerLimits(t *testing.T) {

srv := &Server{
Config: Config{
PrivateKey: srvkey,
MaxPeers: 0,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
PrivateKey: srvkey,
MaxPeers: 0,
MaxPendingPeers: 50,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
},
newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return tp },
}
Expand Down Expand Up @@ -432,12 +437,13 @@ func TestServerSetupConn(t *testing.T) {
for i, test := range tests {
t.Run(test.wantCalls, func(t *testing.T) {
cfg := Config{
PrivateKey: srvkey,
MaxPeers: 10,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
PrivateKey: srvkey,
MaxPeers: 10,
MaxPendingPeers: 10,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
}
srv := &Server{
Config: cfg,
Expand Down Expand Up @@ -518,13 +524,14 @@ func TestServerInboundThrottle(t *testing.T) {
newTransportCalled := make(chan struct{})
srv := &Server{
Config: Config{
PrivateKey: newkey(),
ListenAddr: "127.0.0.1:0",
MaxPeers: 10,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
PrivateKey: newkey(),
ListenAddr: "127.0.0.1:0",
MaxPeers: 10,
MaxPendingPeers: 10,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
},
newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport {
newTransportCalled <- struct{}{}
Expand Down
1 change: 1 addition & 0 deletions p2p/simulations/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
P2P: p2p.Config{
PrivateKey: config.PrivateKey,
MaxPeers: math.MaxInt32,
MaxPendingPeers: 50,
NoDiscovery: true,
Dialer: s,
EnableMsgEvents: config.EnableMsgEvents,
Expand Down