diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index dd77aefb60..11c9898e93 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -5,7 +5,7 @@ else: import - std/[options, sugar, sets, sequtils, times, strutils, math], + std/[options, sets, tables, sequtils, times, strutils, math], chronos, chronicles, metrics, @@ -14,13 +14,11 @@ import libp2p/nameresolving/nameresolver import ../../common/nimchronos, + ../../common/enr, ../../waku_core, ../../waku_relay, ../../waku_enr/sharding, ../../waku_enr/capabilities, - ../../waku_store/common, - ../../waku_filter_v2/common, - ../../waku_lightpush/common, ../../waku_metadata, ./peer_store/peer_storage, ./waku_peer_store @@ -728,38 +726,77 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = await pm.connectToNodes(uniquePeers[i..= pruningCount: break - if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts: - pm.peerStore.del(peerId) - pruned += 1 - # if we still need to prune, prune peers that are not connected + peersToPrune.incl(peerId) + let notConnected = pm.peerStore.getNotConnectedPeers().mapIt(it.peerId) - for peerId in notConnected: - if peersToPrune - pruned == 0: + + var shardlessPeers: seq[PeerId] + var peersByShard = initTable[uint16, seq[PeerId]]() + + for peer in notConnected: + if not pm.peerStore[ENRBook].contains(peer): + shardlessPeers.add(peer) + continue + + let record = pm.peerStore[ENRBook][peer] + + let rec = record.toTyped().valueOr: + shardlessPeers.add(peer) + continue + + let rs = rec.relaySharding().valueOr: + shardlessPeers.add(peer) + continue + + for shard in rs.shardIds: + peersByShard.mgetOrPut(shard, @[peer]).add(peer) + + # prune not connected peers without shard + for peer in shardlessPeers: + if peersToPrune.len >= pruningCount: break - pm.peerStore.del(peerId) - pruned += 1 - let afterNumPeers = toSeq(pm.peerStore[AddressBook].book.keys).len + peersToPrune.incl(peer) + + # calculate the avg peers per shard + let total = sum(toSeq(peersByShard.values).mapIt(it.len)) + let avg = min(1, total div max(1, peersByShard.len)) + + # prune peers from shard with higher than avg count + for shard, peers in peersByShard.pairs: + let count = max(peers.len - avg, 0) + for peer in peers[0..count]: + if peersToPrune.len >= pruningCount: + break + + peersToPrune.incl(peer) + + for peer in peersToPrune: + pm.peerStore.delete(peer) + + let afterNumPeers = pm.peerStore[AddressBook].book.len + debug "Finished pruning peer store", beforeNumPeers = numPeers, afterNumPeers = afterNumPeers, capacity = capacity, - pruned = pruned + pruned = peersToPrune.len proc selectPeer*(pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)): Option[RemotePeerInfo] = debug "Selecting peer from peerstore", protocol=proto