Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GossipSub: cancel inflight msgs when receiving duplicate #851

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ logScope:
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
declareCounter(libp2p_gossipsub_invalid_topic_subscription, "number of invalid topic subscriptions that happened")
declareCounter(libp2p_gossipsub_duplicate_during_validation, "number of duplicates received during message validation")
declareCounter(libp2p_gossipsub_duplicate_during_broadcast, "number of duplicates received during message broadcast")
declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")

Expand Down Expand Up @@ -291,10 +292,28 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
g.send(
asyncSpawn g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))

proc lazyBroadcast(
g: GossipSub,
peers: seq[PubSubPeer],
msgIdSalted: MessageId,
msg: RPCMsg) {.async.} =
let
futsTable = g.cancellableBroadcast(peers, msg)
futs = toSeq(futsTable.values)
g.sendingFutures[msgIdSalted] = futsTable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't there be two inflight lazy broadcasts of the same message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That shouldn't happen, I may add an assert to be sure

await allFutures(futs)
g.sendingFutures.del(msgIdSalted)

# This isn't equal to the amount of saved bandwidth,
# because lower layer may send the data even after
# cancellation
let cancelledCount = futs.countIt(it.cancelled)
libp2p_gossipsub_duplicate_during_broadcast.inc(cancelledCount.int64)

proc validateAndRelay(g: GossipSub,
msg: Message,
msgId, msgIdSalted: MessageId,
Expand Down Expand Up @@ -339,7 +358,7 @@ proc validateAndRelay(g: GossipSub,

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
asyncSpawn g.lazyBroadcast(toSeq(toSendPeers), msgIdSalted, RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue
Expand Down Expand Up @@ -397,6 +416,10 @@ method rpcHandler*(g: GossipSub,
let delay = Moment.now() - g.firstSeen(msgId)
g.rewardDelivered(peer, msg.topicIds, false, delay)

g.sendingFutures.withValue(msgIdSalted, futs):
futs[].withValue(peer.peerId, fut):
fut[].cancel()

libp2p_gossipsub_duplicate.inc()

# onto the next message
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId])
else:
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
g.send(peer, RPCMsg(control: some(control)))
asyncSpawn g.send(peer, RPCMsg(control: some(control)))

g.mcache.shift() # shift the cache

Expand Down
2 changes: 2 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ type

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
SendingFuturesTable* = Table[MessageId, Table[PeerId, Future[void]]]

RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]]
RoutingRecordsHandler* =
Expand All @@ -164,6 +165,7 @@ type
control*: Table[string, ControlMessage] # pending control messages
mcache*: MCache # messages cache
validationSeen*: ValidationSeenTable # peers who sent us message in validation
sendingFutures*: SendingFuturesTable # messages which are currently being sent
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
scoringHeartbeatFut*: Future[void] # cancellation future for scoring heartbeat interval
heartbeatRunning*: bool
Expand Down
41 changes: 32 additions & 9 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,17 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =

libp2p_pubsub_peers.set(p.peers.len.int64)

proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [Defect].} =
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg): Future[void] {.raises: [Defect].} =
## Attempt to send `msg` to remote peer
##

trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg, p.anonymize)

proc broadcast*(
proc updateBroadcastMetrics(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg) {.raises: [Defect].} =
## Attempt to send `msg` to the given peers

let npeers = sendPeers.len.int64
msg: RPCMsg,
npeers: int64) =
for sub in msg.subscriptions:
if sub.subscribe:
if p.knownTopics.contains(sub.topic):
Expand Down Expand Up @@ -192,24 +189,50 @@ proc broadcast*(
else:
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])

proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg) {.raises: [Defect].} =
## Attempt to send `msg` to the given peers

p.updateBroadcastMetrics(msg, sendPeers.len.int64)
trace "broadcasting messages to peers",
peers = sendPeers.len, msg = shortLog(msg)

if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg)
asyncSpawn p.send(peer, msg)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded)

proc cancellableBroadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg): Table[PeerId, Future[void]] {.raises: [Defect].} =
## Attempt to send `msg` to the given peers

p.updateBroadcastMetrics(msg, sendPeers.len)
trace "broadcasting messages to peers",
peers = sendPeers.len, msg = shortLog(msg)

if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
result[peer.peerId] = p.send(peer, msg)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
result[peer.peerId] = peer.sendEncoded(encoded)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this already synchronously put bytes in the send buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With mplex, that will indeed go synchronously through every layer, down to chronos
I thought that chronos supported write cancellation, apparently not, quoting cheatfate:

There is a big problem with write and i wish not to solve it today or tomorrow, because write is made using queue, so cancelling write in the middle should remove this write call from the queue, but it will break cross-platform compatibility, because on Windows this queue is in system, so i can't freely remove write call from the middle of the queue...

So we could add cancellation support to linux at least

With yamux, there is an intermediary queue before chronos (since there is per-stream backpressure), that does support cancellation:
https://github.com/status-im/nim-libp2p/blob/351bda2b56e4389a91fb42199621b3617bdbbdd3/libp2p/muxers/yamux/yamux.nim#L285

So this PR only makes sense if we get chronos support for write cancellation on linux, or enable yamux in nimbus :/

Copy link
Contributor Author

@Menduist Menduist Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting TCP cancellation support in chronos seems less complex than expected (even on windows): status-im/nim-chronos#353
I'll wait for reviews to be sure


proc sendSubs*(p: PubSub,
peer: PubSubPeer,
topics: openArray[string],
subscribe: bool) =
## send subscriptions to remote peer
p.send(peer, RPCMsg.withSubs(topics, subscribe))
asyncSpawn p.send(peer, RPCMsg.withSubs(topics, subscribe))

for topic in topics:
if subscribe:
Expand Down
8 changes: 5 additions & 3 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} =
try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
except CancelledError as exc:
raise exc
except CatchableError as exc:
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
Expand All @@ -257,7 +259,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} =

await conn.close() # This will clean up the send connection

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool): Future[void] {.raises: [Defect].} =
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)

# When sending messages, we take care to re-encode them with the right
Expand All @@ -277,7 +279,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
sendMetrics(msg)
encodeRpcMsg(msg, anonymize)

asyncSpawn p.sendEncoded(encoded)
return p.sendEncoded(encoded)

proc new*(
T: typedesc[PubSubPeer],
Expand Down