diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 13cc78e4da..08fbdd524e 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -132,6 +132,7 @@ var ( utils.PruneAncientDataFlag, utils.ListenPortFlag, utils.MaxPeersFlag, + utils.MaxPeersPerIPFlag, utils.MaxPendingPeersFlag, utils.MiningEnabledFlag, utils.MinerThreadsFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index ab813a92f9..690a2e8251 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -176,6 +176,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.DNSDiscoveryFlag, utils.ListenPortFlag, utils.MaxPeersFlag, + utils.MaxPeersPerIPFlag, utils.MaxPendingPeersFlag, utils.NATFlag, utils.NoDiscoverFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 0dbac558e2..e05cdc70a5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -702,6 +702,13 @@ var ( Usage: "Maximum number of network peers (network disabled if set to 0)", Value: node.DefaultConfig.P2P.MaxPeers, } + + MaxPeersPerIPFlag = cli.IntFlag{ + Name: "maxpeersperip", + Usage: "Maximum number of network peers from a single IP address, (default used if set to <= 0, which is same as MaxPeers)", + Value: node.DefaultConfig.P2P.MaxPeersPerIP, + } + MaxPendingPeersFlag = cli.IntFlag{ Name: "maxpendpeers", Usage: "Maximum number of pending connection attempts (defaults used if set to 0)", @@ -1282,6 +1289,15 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) { cfg.MaxPeers = lightPeers } } + // if max peers per ip is not set, use max peers + if cfg.MaxPeersPerIP <= 0 { + cfg.MaxPeersPerIP = cfg.MaxPeers + } + // flag like: `--maxpeersperip 10` could override the setting in config.toml + if ctx.GlobalIsSet(MaxPeersPerIPFlag.Name) { + cfg.MaxPeersPerIP = ctx.GlobalInt(MaxPeersPerIPFlag.Name) + } + if !(lightClient || lightServer) { lightPeers = 0 } diff --git a/eth/backend.go b/eth/backend.go index b137118e74..bdc93c9783 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -656,7 +656,7 @@ func (s *Ethereum) Start() error { maxPeers -= s.config.LightPeers } // Start the networking layer and the light server if requested - s.handler.Start(maxPeers) + s.handler.Start(maxPeers, s.p2pServer.MaxPeersPerIP) return nil } diff --git a/eth/handler.go b/eth/handler.go index 1ce579a0d0..94dbf0b8c6 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -20,6 +20,7 @@ import ( "errors" "math" "math/big" + "strings" "sync" "sync/atomic" "time" @@ -140,6 +141,9 @@ type handler struct { maliciousVoteMonitor *monitor.MaliciousVoteMonitor chain *core.BlockChain maxPeers int + maxPeersPerIP int + peersPerIP map[string]int + peerPerIPLock sync.Mutex downloader *downloader.Downloader blockFetcher *fetcher.BlockFetcher @@ -186,6 +190,7 @@ func newHandler(config *handlerConfig) (*handler, error) { chain: config.Chain, peers: config.PeerSet, merger: config.Merger, + peersPerIP: make(map[string]int), whitelist: config.Whitelist, directBroadcast: config.DirectBroadcast, diffSync: config.DiffSync, @@ -387,11 +392,30 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { } } // Ignore maxPeers if this is a trusted peer - if !peer.Peer.Info().Network.Trusted { + peerInfo := peer.Peer.Info() + if !peerInfo.Network.Trusted { if reject || h.peers.len() >= h.maxPeers { return p2p.DiscTooManyPeers } } + + remoteAddr := peerInfo.Network.RemoteAddress + indexIP := strings.LastIndex(remoteAddr, ":") + if indexIP == -1 { + // there could be no IP address, such as a pipe + peer.Log().Debug("runEthPeer", "no ip address, remoteAddress", remoteAddr) + } else if !peerInfo.Network.Trusted { + remoteIP := remoteAddr[:indexIP] + h.peerPerIPLock.Lock() + if num, ok := h.peersPerIP[remoteIP]; ok && num >= h.maxPeersPerIP { + h.peerPerIPLock.Unlock() + peer.Log().Info("The IP has too many peers", "ip", remoteIP, "maxPeersPerIP", h.maxPeersPerIP, + "name", peerInfo.Name, "Enode", peerInfo.Enode) + return p2p.DiscTooManyPeers + } + h.peersPerIP[remoteIP] = h.peersPerIP[remoteIP] + 1 + h.peerPerIPLock.Unlock() + } peer.Log().Debug("Ethereum peer connected", "name", peer.Name()) // Register the peer locally @@ -626,11 +650,32 @@ func (h *handler) unregisterPeer(id string) { if err := h.peers.unregisterPeer(id); err != nil { logger.Error("Ethereum peer removal failed", "err", err) } + + peerInfo := peer.Peer.Info() + remoteAddr := peerInfo.Network.RemoteAddress + indexIP := strings.LastIndex(remoteAddr, ":") + if indexIP == -1 { + // there could be no IP address, such as a pipe + peer.Log().Debug("unregisterPeer", "name", peerInfo.Name, "no ip address, remoteAddress", remoteAddr) + } else if !peerInfo.Network.Trusted { + remoteIP := remoteAddr[:indexIP] + h.peerPerIPLock.Lock() + if h.peersPerIP[remoteIP] <= 0 { + peer.Log().Error("unregisterPeer without record", "name", peerInfo.Name, "remoteAddress", remoteAddr) + } else { + h.peersPerIP[remoteIP] = h.peersPerIP[remoteIP] - 1 + logger.Debug("unregisterPeer", "name", peerInfo.Name, "connectNum", h.peersPerIP[remoteIP]) + if h.peersPerIP[remoteIP] == 0 { + delete(h.peersPerIP, remoteIP) + } + } + h.peerPerIPLock.Unlock() + } } -func (h *handler) Start(maxPeers int) { +func (h *handler) Start(maxPeers int, maxPeersPerIP int) { h.maxPeers = maxPeers - + h.maxPeersPerIP = maxPeersPerIP // broadcast transactions h.wg.Add(1) h.txsCh = make(chan core.NewTxsEvent, txChanSize) diff --git a/eth/handler_diff_test.go b/eth/handler_diff_test.go index 06d16b4116..2a1528544c 100644 --- a/eth/handler_diff_test.go +++ b/eth/handler_diff_test.go @@ -96,7 +96,7 @@ func newTestBackendWithGenerator(blocks int) *testBackend { BloomCache: 1, Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()), }) - handler.Start(100) + handler.Start(100, 100) txconfig := core.DefaultTxPoolConfig txconfig.Journal = "" // Don't litter the disk with test journals diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 48b487c874..9065d4b599 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -20,6 +20,8 @@ import ( "fmt" "math/big" "math/rand" + "strconv" + "sync" "sync/atomic" "testing" "time" @@ -148,8 +150,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { BloomCache: 1, }) ) - ethNoFork.Start(1000) - ethProFork.Start(1000) + ethNoFork.Start(1000, 1000) + ethProFork.Start(1000, 1000) // Clean up everything after ourselves defer chainNoFork.Stop() @@ -928,3 +930,109 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) { } } } + +func TestOptionMaxPeersPerIP(t *testing.T) { + t.Parallel() + + handler := newTestHandler() + defer handler.close() + var ( + genesis = handler.chain.Genesis() + head = handler.chain.CurrentBlock() + td = handler.chain.GetTd(head.Hash(), head.NumberU64()) + wg = sync.WaitGroup{} + maxPeersPerIP = handler.handler.maxPeersPerIP + uniPort = 1000 + ) + + tryFunc := func(tryNum int, ip1 string, ip2 string, trust bool, doneCh chan struct{}) { + // Create a source peer to send messages through and a sink handler to receive them + p2pSrc, p2pSink := p2p.MsgPipe() + defer p2pSrc.Close() + defer p2pSink.Close() + + peer1 := p2p.NewPeerPipe(enode.ID{0}, "", nil, p2pSrc) + peer1.UpdateTestRemoteAddr(ip1 + strconv.Itoa(uniPort)) + peer2 := p2p.NewPeerPipe(enode.ID{byte(uniPort)}, "", nil, p2pSink) + peer2.UpdateTestRemoteAddr(ip2 + strconv.Itoa(uniPort)) + if trust { + peer2.UpdateTrustFlagTest() + } + uniPort++ + + src := eth.NewPeer(eth.ETH66, peer1, p2pSrc, handler.txpool) + sink := eth.NewPeer(eth.ETH66, peer2, p2pSink, handler.txpool) + defer src.Close() + defer sink.Close() + + wg.Add(1) + go func(num int) { + err := handler.handler.runEthPeer(sink, func(peer *eth.Peer) error { + wg.Done() + <-doneCh + return nil + }) + // err is nil, connection ok and it is closed by the doneCh + if err == nil { + if trust || num <= maxPeersPerIP { + return + } + // if num > maxPeersPerIP and not trust, should report: p2p.DiscTooManyPeers + t.Errorf("current num is %d, maxPeersPerIP is %d, should failed", num, maxPeersPerIP) + return + } + wg.Done() + if trust { + t.Errorf("trust node should not failed, num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err) + } + // err should be p2p.DiscTooManyPeers and num > maxPeersPerIP + if err == p2p.DiscTooManyPeers && num > maxPeersPerIP { + return + } + + t.Errorf("current num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err) + }(tryNum) + + if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil { + t.Fatalf("failed to run protocol handshake") + } + // make sure runEthPeer execute one by one. + wg.Wait() + } + + // case 1: normal case + doneCh1 := make(chan struct{}) + for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ { + tryFunc(tryNum, "1.2.3.11:", "1.2.3.22:", false, doneCh1) + } + close(doneCh1) + + // case 2: once the previous connection was unregisterred, new connections with same IP can be accepted. + doneCh2 := make(chan struct{}) + for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ { + tryFunc(tryNum, "1.2.3.11:", "1.2.3.22:", false, doneCh2) + } + close(doneCh2) + + // case 3: ipv6 address, like: [2001:db8::1]:80 + doneCh3 := make(chan struct{}) + for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ { + tryFunc(tryNum, "[2001:db8::11]:", "[2001:db8::22]:", false, doneCh3) + } + close(doneCh3) + + // case 4: same as case 2, but for ipv6 + doneCh4 := make(chan struct{}) + for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ { + tryFunc(tryNum, "[2001:db8::11]:", "[2001:db8::22]:", false, doneCh4) + } + close(doneCh4) + + // case 5: test trust node + doneCh5 := make(chan struct{}) + for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ { + tryFunc(tryNum, "[2001:db8::11]:", "[2001:db8::22]:", true, doneCh5) + } + close(doneCh5) + +} diff --git a/eth/handler_test.go b/eth/handler_test.go index b38248e47b..c7274c1d90 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -177,7 +177,7 @@ func newTestHandlerWithBlocks(blocks int) *testHandler { Sync: downloader.SnapSync, BloomCache: 1, }) - handler.Start(1000) + handler.Start(1000, 3) return &testHandler{ db: db, diff --git a/node/defaults.go b/node/defaults.go index c685dde5d1..c240ac447b 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -47,9 +47,10 @@ var DefaultConfig = Config{ WSModules: []string{"net", "web3"}, GraphQLVirtualHosts: []string{"localhost"}, P2P: p2p.Config{ - ListenAddr: ":30303", - MaxPeers: 50, - NAT: nat.Any(), + ListenAddr: ":30303", + MaxPeers: 50, + MaxPeersPerIP: 0, // by default, it will be same as MaxPeers + NAT: nat.Any(), }, } diff --git a/p2p/peer.go b/p2p/peer.go index 84efde6a4f..cfebfdcdd9 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -116,8 +116,9 @@ type Peer struct { disc chan DiscReason // events receives message send / receive events if set - events *event.Feed - testPipe *MsgPipeRW // for testing + events *event.Feed + testPipe *MsgPipeRW // for testing + testRemoteAddr string // for testing } // NewPeer returns a peer for testing purposes. @@ -203,9 +204,23 @@ func (p *Peer) RunningCap(protocol string, versions []uint) bool { // RemoteAddr returns the remote address of the network connection. func (p *Peer) RemoteAddr() net.Addr { + if len(p.testRemoteAddr) > 0 { + if addr, err := net.ResolveTCPAddr("tcp", p.testRemoteAddr); err == nil { + return addr + } + log.Warn("RemoteAddr", "invalid testRemoteAddr", p.testRemoteAddr) + } return p.rw.fd.RemoteAddr() } +func (p *Peer) UpdateTestRemoteAddr(addr string) { // test purpose only + p.testRemoteAddr = addr +} + +func (p *Peer) UpdateTrustFlagTest() { // test purpose only + p.rw.set(trustedConn, true) +} + // LocalAddr returns the local address of the network connection. func (p *Peer) LocalAddr() net.Addr { return p.rw.fd.LocalAddr() diff --git a/p2p/server.go b/p2p/server.go index 8008ed8ae7..b4aac9a13d 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -81,6 +81,10 @@ type Config struct { // connected. It must be greater than zero. MaxPeers int + // MaxPeersPerIP is the maximum number of peers that can be + // connected from a single IP. It must be greater than zero. + MaxPeersPerIP int `toml:",omitempty"` + // MaxPendingPeers is the maximum number of peers that can be pending in the // handshake phase, counted separately for inbound and outbound connections. // Zero defaults to preset values.