Skip to content

Commit

Permalink
added sharded peer store pruning (#2167)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Dec 7, 2023
1 parent dba9820 commit 281c13a
Showing 1 changed file with 61 additions and 24 deletions.
85 changes: 61 additions & 24 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ else:


import
std/[options, sugar, sets, sequtils, times, strutils, math],
std/[options, sets, tables, sequtils, times, strutils, math],
chronos,
chronicles,
metrics,
Expand All @@ -14,13 +14,11 @@ import
libp2p/nameresolving/nameresolver
import
../../common/nimchronos,
../../common/enr,
../../waku_core,
../../waku_relay,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../waku_store/common,
../../waku_filter_v2/common,
../../waku_lightpush/common,
../../waku_metadata,
./peer_store/peer_storage,
./waku_peer_store
Expand Down Expand Up @@ -728,38 +726,77 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
await pm.connectToNodes(uniquePeers[i..<stop])

proc prunePeerStore*(pm: PeerManager) =
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
let numPeers = pm.peerStore[AddressBook].book.len
let capacity = pm.peerStore.capacity
if numPeers < capacity:
if numPeers <= capacity:
return

debug "Peer store capacity exceeded", numPeers = numPeers, capacity = capacity
let peersToPrune = numPeers - capacity

# prune peers with too many failed attempts
var pruned = 0
# copy to avoid modifying the book while iterating
let peerKeys = toSeq(pm.peerStore[NumberFailedConnBook].book.keys)
for peerId in peerKeys:
if peersToPrune - pruned == 0:
let pruningCount = numPeers - capacity
var peersToPrune: HashSet[PeerId]

# prune failed connections
for peerId, count in pm.peerStore[NumberFailedConnBook].book.pairs:
if count < pm.maxFailedAttempts:
continue

if peersToPrune.len >= pruningCount:
break
if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts:
pm.peerStore.del(peerId)
pruned += 1

# if we still need to prune, prune peers that are not connected
peersToPrune.incl(peerId)

let notConnected = pm.peerStore.getNotConnectedPeers().mapIt(it.peerId)
for peerId in notConnected:
if peersToPrune - pruned == 0:

var shardlessPeers: seq[PeerId]
var peersByShard = initTable[uint16, seq[PeerId]]()

for peer in notConnected:
if not pm.peerStore[ENRBook].contains(peer):
shardlessPeers.add(peer)
continue

let record = pm.peerStore[ENRBook][peer]

let rec = record.toTyped().valueOr:
shardlessPeers.add(peer)
continue

let rs = rec.relaySharding().valueOr:
shardlessPeers.add(peer)
continue

for shard in rs.shardIds:
peersByShard.mgetOrPut(shard, @[peer]).add(peer)

# prune not connected peers without shard
for peer in shardlessPeers:
if peersToPrune.len >= pruningCount:
break
pm.peerStore.del(peerId)
pruned += 1

let afterNumPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
peersToPrune.incl(peer)

# calculate the avg peers per shard
let total = sum(toSeq(peersByShard.values).mapIt(it.len))
let avg = min(1, total div max(1, peersByShard.len))

# prune peers from shard with higher than avg count
for shard, peers in peersByShard.pairs:
let count = max(peers.len - avg, 0)
for peer in peers[0..count]:
if peersToPrune.len >= pruningCount:
break

peersToPrune.incl(peer)

for peer in peersToPrune:
pm.peerStore.delete(peer)

let afterNumPeers = pm.peerStore[AddressBook].book.len

debug "Finished pruning peer store", beforeNumPeers = numPeers,
afterNumPeers = afterNumPeers,
capacity = capacity,
pruned = pruned
pruned = peersToPrune.len

proc selectPeer*(pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)): Option[RemotePeerInfo] =
debug "Selecting peer from peerstore", protocol=proto
Expand Down

0 comments on commit 281c13a

Please sign in to comment.