Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GossipSub: IDontWant #934

Merged
merged 2 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
g.handlePrune(peer, control.prune)

var respControl: ControlMessage
g.handleIDontWant(peer, control.idontwant)
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIds.len > 0:
respControl.iwant.add(iwant)
Expand Down Expand Up @@ -332,11 +333,25 @@ proc validateAndRelay(g: GossipSub,
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary?

# Don't send it to source peer, or peers that
# sent it during validation
toSendPeers.excl(peer)
toSendPeers.excl(seenPeers)

if msg.data.len > msgId.len * 10:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this condition more descriptive - like making it a proc or adding a comment with the reasoning?

g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
Copy link
Contributor

@arnetheduck arnetheduck Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should only broadcast this to peers that haven't themselves sent us an idontwant for the given hash, ie this should be moved below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true in this version of IDontWant, because as soon as we sent the IDontWant, we also send the full message, so it's too late to "choke" us
However, in future versions we might send the IDontWant earlier (before validation), or send the full message later (staggered sending)

So in these future versions, receiving a IDontWant won't mean it's too late to send ours. Sending the IDontWant to everyone is "forward-compatibility" at the cost of a few bytes

idontwant: @[ControlIWant(messageIds: @[msgId])]
))))

for peer in toSendPeers:
for heDontWant in peer.heDontWants:
if msgId in heDontWant:
seenPeers.incl(peer)
break
toSendPeers.excl(seenPeers)


# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
Expand Down
12 changes: 12 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ proc handleIHave*(g: GossipSub,
g.rng.shuffle(res.messageIds)
return res

proc handleIDontWant*(g: GossipSub,
peer: PubSubPeer,
iDontWants: seq[ControlIWant]) =
for dontWant in iDontWants:
for message in dontWant.messageIds:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messageId?

if peer.heDontWants[^1].len > 1000: break
if message.len > 100: break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be a continue?

peer.heDontWants[^1].incl(message)

proc handleIWant*(g: GossipSub,
peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] {.raises: [].} =
Expand Down Expand Up @@ -629,6 +638,9 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
peer.sentIHaves.addFirst(default(HashSet[MessageId]))
if peer.sentIHaves.len > g.parameters.historyLength:
discard peer.sentIHaves.popLast()
peer.heDontWants.addFirst(default(HashSet[MessageId]))
if peer.heDontWants.len > g.parameters.historyLength:
discard peer.heDontWants.popLast()
peer.iHaveBudget = IHavePeerBudget
peer.pingBudget = PingsPeerBudget

Expand Down
4 changes: 3 additions & 1 deletion libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import rpc/[messages, message, protobuf],
../../protobuf/minprotobuf,
../../utility

export peerid, connection
export peerid, connection, deques

logScope:
topics = "libp2p pubsubpeer"
Expand Down Expand Up @@ -60,6 +60,7 @@ type

score*: float64
sentIHaves*: Deque[HashSet[MessageId]]
heDontWants*: Deque[HashSet[MessageId]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not iDontWants or sentIDontWants?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the list of "iDontWant" he sent, so I think "heDontWants" or "receivedDontWants" is more descriptive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so maybe rcvDontWants.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just dontWants is enough really

iHaveBudget*: int
pingBudget*: int
maxMessageSize: int
Expand Down Expand Up @@ -317,3 +318,4 @@ proc new*(
maxMessageSize: maxMessageSize
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))
1 change: 1 addition & 0 deletions libp2p/protocols/pubsub/rpc/messages.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type
iwant*: seq[ControlIWant]
graft*: seq[ControlGraft]
prune*: seq[ControlPrune]
idontwant*: seq[ControlIWant]

ControlIHave* = object
topicId*: string
Expand Down
6 changes: 6 additions & 0 deletions libp2p/protocols/pubsub/rpc/protobuf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(3, graft)
for prune in control.prune:
ipb.write(4, prune)
for idontwant in control.idontwant:
ipb.write(5, idontwant)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
Expand Down Expand Up @@ -210,6 +212,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
var iwantpbs: seq[seq[byte]]
var graftpbs: seq[seq[byte]]
var prunepbs: seq[seq[byte]]
var idontwant: seq[seq[byte]]
if ? cpb.getRepeatedField(1, ihavepbs):
for item in ihavepbs:
control.ihave.add(? decodeIHave(initProtoBuffer(item)))
Expand All @@ -222,6 +225,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
if ? cpb.getRepeatedField(4, prunepbs):
for item in prunepbs:
control.prune.add(? decodePrune(initProtoBuffer(item)))
if ? cpb.getRepeatedField(5, idontwant):
for item in idontwant:
control.idontwant.add(? decodeIWant(initProtoBuffer(item)))
trace "decodeControl: message statistics", graft_count = len(control.graft),
prune_count = len(control.prune),
ihave_count = len(control.ihave),
Expand Down
60 changes: 60 additions & 0 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -796,3 +796,63 @@ suite "GossipSub":
)

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - iDontWant":
# 3 nodes: A <=> B <=> C
# (A & C are NOT connected). We pre-emptively send a dontwant from C to B,
# and check that B doesn't relay the message to C.
# We also check that B sends IDONTWANT to C, but not A
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
ok(newSeq[byte](10))
let
nodes = generateNodes(
3,
gossip = true,
msgIdProvider = dumbMsgIdProvider
)

nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
nodes[2].switch.start(),
)

await nodes[0].switch.connect(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs)
await nodes[1].switch.connect(nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs)

let bFinished = newFuture[void]()
proc handlerA(topic: string, data: seq[byte]) {.async, gcsafe.} = discard
proc handlerB(topic: string, data: seq[byte]) {.async, gcsafe.} = bFinished.complete()
proc handlerC(topic: string, data: seq[byte]) {.async, gcsafe.} = doAssert false

nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
nodes[2].subscribe("foobar", handlerB)
await waitSubGraph(nodes, "foobar")

var gossip1: GossipSub = GossipSub(nodes[0])
var gossip2: GossipSub = GossipSub(nodes[1])
var gossip3: GossipSub = GossipSub(nodes[2])

check: gossip3.mesh.peers("foobar") == 1

gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])]
))))
checkExpiring: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)

tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1

await bFinished

checkExpiring: toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 1)
check: toSeq(gossip1.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 0)

await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop(),
nodes[2].switch.stop()
)

await allFuturesThrowing(nodesFut.concat())