Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(gossipsub): Topic Membership Tests #1201

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
871efab
added test wrt subscribe and unsubscribe
shashankshampi Sep 26, 2024
dc7f8d4
added tests/pubsub/testgossipinternal2 file
shashankshampi Sep 26, 2024
5790b6f
linters
shashankshampi Sep 26, 2024
eced002
Merge branch 'master' into block6Test
shashankshampi Sep 26, 2024
1c2e221
refactor and suite name refactor
shashankshampi Sep 27, 2024
2923a2d
test(gossipsub): added test for membership for join and leave topic
shashankshampi Sep 30, 2024
fda0d2b
test(gossipsub): import optimization
shashankshampi Oct 1, 2024
eb2f6bf
test(gossipsub): Test cases covering subscribe and unsubscribe Events
shashankshampi Sep 26, 2024
9c0966e
Merge branch 'master' into block6Test
shashankshampi Oct 2, 2024
27c2850
Merge branch 'master' into block6Test2
shashankshampi Oct 2, 2024
25df50d
updtaed as per review comment
shashankshampi Oct 3, 2024
f0c8c5b
review comments to remove unwanted comments
shashankshampi Oct 3, 2024
46b7125
removed internal subscribe and unsubscribe test
shashankshampi Oct 3, 2024
19d3ead
removed unwanted check
shashankshampi Oct 3, 2024
f42a763
added assertion for handle SUBSCRIBE to the topic
shashankshampi Oct 3, 2024
e10e4d0
rebase with block6Test
shashankshampi Oct 5, 2024
89473da
TC fix
shashankshampi Oct 5, 2024
806592d
PR update
shashankshampi Oct 7, 2024
2d38e8a
fix in test logic for multiple peers join and leave topic simultaneously
shashankshampi Oct 8, 2024
d594c04
addressed wornderful review comments
shashankshampi Oct 10, 2024
a12b56c
Merge branch 'block6Test' into block6Test2
shashankshampi Oct 10, 2024
cb7ccae
updated as per review comment
shashankshampi Oct 10, 2024
ff6b274
review comment fix
shashankshampi Oct 10, 2024
567a456
Merge branch 'block6Test' into block6Test2
shashankshampi Oct 10, 2024
cc8e976
removed PubSub in sunscribe
shashankshampi Oct 10, 2024
e68685c
added part 2 PR in same
shashankshampi Oct 16, 2024
44dd7d1
added updated logic to use common handler and create peers nodes as r…
shashankshampi Oct 21, 2024
d80abe0
review comment fix
shashankshampi Oct 22, 2024
a7e80de
rebase
shashankshampi Oct 24, 2024
be818b9
Remove tests/pubsub/testgossipmembership from the PR
shashankshampi Oct 24, 2024
a3c29ae
rebase
shashankshampi Oct 24, 2024
aa34c7f
fix in last test
shashankshampi Oct 24, 2024
df674d5
another set of review comment fix
shashankshampi Oct 24, 2024
1bfdded
added documentation as per request
shashankshampi Oct 25, 2024
49ce782
added alex changes
shashankshampi Oct 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ build/
*.exe
*.dll
.vscode/
.idea/
.DS_Store
tests/pubsub/testgossipsub
examples/*.md
Expand Down
40 changes: 0 additions & 40 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,46 +33,6 @@ suite "GossipSub internal":
teardown:
checkTrackers()

asyncTest "subscribe/unsubscribeAll":
let gossipSub = TestGossipSub.init(newStandardSwitch())

proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe.} =
discard

let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()

var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< 15:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)

# test via dynamic dispatch
gossipSub.PubSub.subscribe(topic, handler)

check:
gossipSub.topics.contains(topic)
gossipSub.gossipsub[topic].len() > 0
gossipSub.mesh[topic].len() > 0

# test via dynamic dispatch
gossipSub.PubSub.unsubscribeAll(topic)

check:
topic notin gossipSub.topics # not in local topics
topic notin gossipSub.mesh # not in mesh
topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out)

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

asyncTest "topic params":
let params = TopicParams.init()
params.validateParameters().tryGet()
Expand Down
254 changes: 254 additions & 0 deletions tests/pubsub/testgossipmembership.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
# Nim-LibP2P
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

{.used.}

import std/[options, deques, sequtils, enumerate, algorithm, sets]
import stew/byteutils
import ../../libp2p/builders
import ../../libp2p/errors
import ../../libp2p/crypto/crypto
import ../../libp2p/stream/bufferstream
import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable]
import ../../libp2p/protocols/pubsub/rpc/[message, messages]
import ../../libp2p/switch
import ../../libp2p/muxers/muxer
import ../../libp2p/protocols/pubsub/rpc/protobuf
import utils
import chronos
import chronicles
import ../helpers

proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
discard

proc voidTopicHandler(topic: string, data: seq[byte]) {.async.} =
discard

const MsgIdSuccess = "msg id gen success"
let DURATION_TIMEOUT = 500.milliseconds

suite "GossipSub Topic Membership Tests":
teardown:
checkTrackers()
# Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d

# Test for subscribing to a topic and verifying mesh and gossipsub structures
asyncTest "handle SUBSCRIBE to the topic":
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
# Given 5 gossipsub nodes
let
numberOfNodes = 5
topic = "test-topic"
nodes = generateNodes(numberOfNodes, gossip = true)

await allFuturesThrowing(nodes.mapIt(it.switch.start()))

# When all of them are connected and subscribed to the same topic
await subscribeNodes(nodes)
for node in nodes:
node.subscribe(topic, voidTopicHandler)

await sleepAsync(2 * DURATION_TIMEOUT)

# Then their related attributes should reflect that
for node in nodes:
let currentGossip = GossipSub(node)
check currentGossip.topics.contains(topic)
check currentGossip.gossipsub[topic].len() == numberOfNodes - 1
check currentGossip.mesh[topic].len() == numberOfNodes - 1

await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))

# Test unsubscribing from a topic and verifying removal from relevant data structures
asyncTest "handle UNSUBSCRIBE to the topic":
# Given 5 nodes subscribed to a topic
let
numberOfNodes = 5
topic = "test-topic"
nodes = generateNodes(numberOfNodes, gossip = true)

await allFuturesThrowing(nodes.mapIt(it.switch.start()))

await subscribeNodes(nodes)
for node in nodes:
node.subscribe(topic, voidTopicHandler)

await sleepAsync(2 * DURATION_TIMEOUT)

# When all nodes unsubscribe from the topic
for node in nodes:
node.unsubscribe(topic, voidTopicHandler)

await sleepAsync(2 * DURATION_TIMEOUT)

# Then the topic should be removed from relevant data structures
for node in nodes:
let currentGossip = GossipSub(node)
check topic notin currentGossip.topics
if topic in currentGossip.mesh:
check currentGossip.mesh[topic].len == 0
else:
check topic notin currentGossip.mesh
if topic in currentGossip.gossipsub:
check currentGossip.gossipsub[topic].len == 0
else:
check topic notin currentGossip.gossipsub

await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))

# Test subscribing and unsubscribing multiple topics
asyncTest "handle SUBSCRIBE and UNSUBSCRIBE multiple topics":
# Given 3 nodes and multiple topics
let
numberOfNodes = 3
topics = ["topic1", "topic2", "topic3"].toSeq()
nodes = generateNodes(numberOfNodes, gossip = true)

await allFuturesThrowing(nodes.mapIt(it.switch.start()))

# When nodes subscribe to multiple topics
await subscribeNodes(nodes)
for node in nodes:
for topic in topics:
node.subscribe(topic, voidTopicHandler)

await sleepAsync(2 * DURATION_TIMEOUT)

# Then all nodes should be subscribed to the topics initially
for node in nodes:
let currentGossip = GossipSub(node)
check currentGossip.topics.len == topics.len
for topic in topics:
check currentGossip.gossipsub[topic].len == numberOfNodes - 1

# When they unsubscribe from all topics
for node in nodes:
for topic in topics:
node.unsubscribe(topic, voidTopicHandler)

await sleepAsync(2 * DURATION_TIMEOUT)

# Then topics should be removed from mesh and gossipsub
for node in nodes:
let currentGossip = GossipSub(node)
for topic in topics:
check topic notin currentGossip.topics
check topic notin currentGossip.mesh
check topic notin currentGossip.gossipsub

await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))

# Test ensuring the number of subscriptions does not exceed a set limit
asyncTest "subscription limit test":
# Given one node and a subscription limit of 10 topics
let
topicCount = 15
gossipSubParams = 10
topicNames = toSeq(mapIt(0 .. topicCount - 1, "topic" & $it))
numberOfNodes = 1
nodes = generateNodes(numberOfNodes, gossip = true)

await allFuturesThrowing(nodes.mapIt(it.switch.start()))

# When attempting to subscribe to 15 topics
let gossipSub = GossipSub(nodes[0])
gossipSub.topicsHigh = gossipSubParams

for topic in topicNames:
if gossipSub.topics.len < gossipSub.topicsHigh:
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
gossipSub.subscribe(
topic,
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
discard
,
)

check gossipSub.topics.len == gossipSubParams

await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))

# Test verifying peers joining a topic using `JOIN(topic)`
asyncTest "handle JOIN topic and mesh is updated":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this case and the handle SUBSCRIBE to the topic one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first test is primarily about verifying mesh behavior after joining.
The second test is broader and checks both mesh and gossipsub structures to ensure proper subscription handling.

In short, the second test provides a more thorough validation by checking both the mesh and gossipsub structures, while the first test is specifically focused on the mesh when peers join a topic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might be able to condense the two cases into one.

Are they two different cases in the test plans?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were 6 TC on documentation, and I followed the same.
Let me know if this is not required now will remove it

# Given 5 nodes and a join request to a topic
let
topic = "test-join-topic"
numberOfNodes = 5
nodes = generateNodes(numberOfNodes, gossip = true)

await allFuturesThrowing(nodes.mapIt(it.switch.start()))

# When nodes subscribe to the topic
await subscribeNodes(nodes)
for node in nodes:
node.subscribe(topic, voidTopicHandler)

await sleepAsync(2 * DURATION_TIMEOUT)

# Then each node's mesh should reflect this update
for node in nodes:
let currentGossip = GossipSub(node)
check currentGossip.mesh[topic].len == numberOfNodes - 1
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
check currentGossip.gossipsub.hasKey(topic)
check currentGossip.topics.contains(topic)
Comment on lines +197 to +198
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, do these checks as exhaustive as possible :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all 3 validations is already there. Please help with any validation you think is missed out


await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))

# Test the behavior when multiple peers join and leave a topic simultaneously
asyncTest "multiple peers join and leave topic simultaneously":
# Given 6 nodes and a shared topic
let
numberOfNodes = 6
topic = "foobar"
nodes = generateNodes(numberOfNodes, gossip = true)
nodesFut = await allFinished(nodes.mapIt(it.switch.start()))

# When they all join the topic
await subscribeNodes(nodes)
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
for node in nodes:
node.subscribe(topic, voidTopicHandler)

await sleepAsync(2 * DURATION_TIMEOUT)

# Their attributes should reflect in this loop
for i in 0 ..< numberOfNodes:
let currentGossip = GossipSub(nodes[i])
check currentGossip.gossipsub.hasKey(topic)
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
check currentGossip.mesh.hasKey(topic)
check currentGossip.topics.contains(topic)
Comment on lines +221 to +223
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here (re: exhaustiveness).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all three validations are already there. Please help with any validation you think is missed out


# Make sure all nodes are connected between themselves.
for x in 0 ..< numberOfNodes:
for y in 0 ..< numberOfNodes:
if x != y:
await waitSub(nodes[x], nodes[y], topic)

await sleepAsync(2 * DURATION_TIMEOUT)

let expectedNumberOfPeers = numberOfNodes - 1
for i in 0 ..< numberOfNodes:
let currentGossip = GossipSub(nodes[i])
check:
currentGossip.gossipsub[topic].len == expectedNumberOfPeers
currentGossip.mesh[topic].len == expectedNumberOfPeers
currentGossip.fanout.len == 0

# When some peers unsubscribe
let firstNodeGossip = GossipSub(nodes[0])
let peersToUnsubscribe = nodes[1 ..< 3]
for peer in peersToUnsubscribe:
peer.unsubscribe(topic, voidTopicHandler)

await sleepAsync(3 * DURATION_TIMEOUT)

# Then the mesh and gossipsub should reflect the updated peer count
check firstNodeGossip.mesh.getOrDefault(topic).len == 3
check firstNodeGossip.gossipsub[topic].len == 3
check topic in firstNodeGossip.topics

await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
Loading