Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GossipSub: Better IWANT handling #875

Merged
merged 3 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
peer.appScore = stats.appScore
peer.behaviourPenalty = stats.behaviourPenalty

peer.iWantBudget = IWantPeerBudget
peer.iHaveBudget = IHavePeerBudget
peer.iHaveBudget = IHavePeerBudget

method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
case event.kind
Expand Down
32 changes: 16 additions & 16 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics in mesh with no
declareGauge(libp2p_gossipsub_low_peers_topics, "number of topics in mesh with at least one but below dlow peers")
declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh with at least dlow peers (but below dhigh)")
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
declareSummary(libp2p_gossipsub_mcache_hit, "ratio of successful IWANT message cache lookups")
declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"])

proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} =
g.withPeerStats(p.peerId) do (stats: var PeerStats):
Expand Down Expand Up @@ -275,25 +275,19 @@ proc handleIWant*(g: GossipSub,
var messages: seq[Message]
if peer.score < g.parameters.gossipThreshold:
trace "iwant: ignoring low score peer", peer, score = peer.score
elif peer.iWantBudget <= 0:
trace "iwant: ignoring out of budget peer", peer, score = peer.score
else:
let deIwants = iwants.deduplicate()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've remove deduplication since this now happens naturally, though the computation is a tad more expensive

for iwant in deIwants:
let deIwantsMsgs = iwant.messageIds.deduplicate()
for mid in deIwantsMsgs:
for iwant in iwants:
for mid in iwant.messageIds:
trace "peer sent iwant", peer, messageID = mid
if not peer.canAskIWant(mid):
libp2p_gossipsub_received_iwants.inc(1, labelValues=["notsent"])
continue
let msg = g.mcache.get(mid)
if msg.isSome:
libp2p_gossipsub_mcache_hit.observe(1)
# avoid spam
if peer.iWantBudget > 0:
messages.add(msg.get())
dec peer.iWantBudget
else:
break
libp2p_gossipsub_received_iwants.inc(1, labelValues=["correct"])
messages.add(msg.get())
else:
libp2p_gossipsub_mcache_hit.observe(0)
libp2p_gossipsub_received_iwants.inc(1, labelValues=["unknown"])
return messages

proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} =
Expand Down Expand Up @@ -616,8 +610,11 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
g.rng.shuffle(allPeers)
allPeers.setLen(target)

let msgIdsAsSet = ihave.messageIds.toHashSet()

for peer in allPeers:
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
peer.sentIHaves[^1].incl(msgIdsAsSet)

libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)

Expand All @@ -628,7 +625,10 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
# reset IHAVE cap
block:
for peer in g.peers.values:
peer.iWantBudget = IWantPeerBudget
if peer.sentIHaves.len >= g.parameters.historyLength:
peer.sentIHaves = default(HashSet[MessageId]) & peer.sentIHaves[0..^2]
else:
peer.sentIHaves.insert(default(HashSet[MessageId]), 0)
peer.iHaveBudget = IHavePeerBudget

var meshMetrics = MeshMetrics()
Expand Down
1 change: 0 additions & 1 deletion libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ const

const
BackoffSlackTime* = 2 # seconds
IWantPeerBudget* = 25 # 25 messages per second ( reset every heartbeat )
IHavePeerBudget* = 10
# the max amount of IHave to expose, not by spec, but go as example
# rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572
Expand Down
12 changes: 10 additions & 2 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import std/[sequtils, strutils, tables, hashes, options]
import std/[sequtils, strutils, tables, hashes, options, sets]
import stew/results
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
Expand Down Expand Up @@ -62,7 +62,7 @@ type
observers*: ref seq[PubSubObserver] # ref as in smart_ptr

score*: float64
iWantBudget*: int
sentIHaves*: seq[HashSet[MessageId]]
iHaveBudget*: int
maxMessageSize: int
appScore*: float64 # application specific score
Expand Down Expand Up @@ -286,6 +286,13 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =

asyncSpawn p.sendEncoded(encoded)

proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
if msgId in sentIHave:
sentIHave.excl(msgId)
return true
return false

proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
Expand All @@ -299,6 +306,7 @@ proc new*(
onEvent: onEvent,
codec: codec,
peerId: peerId,
sentIHaves: newSeq[HashSet[MessageId]](1),
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize
)
1 change: 1 addition & 0 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ suite "GossipSub internal":
let peer = gossipSub.getPubSubPeer(peerId)
let id = @[0'u8, 1, 2, 3]
gossipSub.mcache.put(id, Message())
peer.sentIHaves[^1].incl(id)
let msg = ControlIWant(
messageIDs: @[id, id, id]
)
Expand Down