Skip to content

Commit

Permalink
chore: deprecating pubsub topic (#2997)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Sep 10, 2024
1 parent d3e6717 commit a3cd2a1
Show file tree
Hide file tree
Showing 35 changed files with 322 additions and 270 deletions.
6 changes: 4 additions & 2 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ when not (compileOption("threads")):

{.push raises: [].}

import std/[strformat, strutils, times, options, random]
import std/[strformat, strutils, times, options, random, sequtils]
import
confutils,
chronicles,
Expand Down Expand Up @@ -379,7 +379,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
raise newException(ConfigurationError, "rln-relay-cred-path MUST be passed")

if conf.relay:
await node.mountRelay(conf.topics.split(" "))
let shards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
await node.mountRelay(shards)

await node.mountLibp2pPing()

Expand Down
18 changes: 13 additions & 5 deletions apps/chat2/config_chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,19 @@ type
name: "keep-alive"
.}: bool

topics* {.
desc: "Default topics to subscribe to (space separated list).",
defaultValue: "/waku/2/rs/0/0",
name: "topics"
.}: string
clusterId* {.
desc:
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 0,
name: "cluster-id"
.}: uint16

shards* {.
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
defaultValue: @[uint16(0)],
name: "shard"
.}: seq[uint16]

## Store config
store* {.
Expand Down
6 changes: 0 additions & 6 deletions apps/chat2bridge/config_chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ type Chat2MatterbridgeConf* = object
name: "nodekey"
.}: crypto.PrivateKey

topics* {.
desc: "Default topics to subscribe to (space separated list)",
defaultValue: "/waku/2/rs/0/0",
name: "topics"
.}: string

store* {.
desc: "Flag whether to start store protocol", defaultValue: true, name: "store"
.}: bool
Expand Down
2 changes: 1 addition & 1 deletion apps/liteprotocoltester/tester_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type LiteProtocolTesterConf* = object

## TODO: extend lite protocol tester configuration based on testing needs
# shards* {.
# desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
# desc: "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
# defaultValue: @[],
# name: "shard"
# .}: seq[uint16]
Expand Down
23 changes: 15 additions & 8 deletions apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,12 @@ proc initAndStartApp(
ipAddr = some(extIp), tcpPort = some(nodeTcpPort), udpPort = some(nodeUdpPort)
)
builder.withWakuCapabilities(flags)
let addShardedTopics = builder.withShardedTopics(conf.pubsubTopics)
if addShardedTopics.isErr():
error "failed to add sharded topics to ENR", error = addShardedTopics.error
return err($addShardedTopics.error)

builder.withWakuRelaySharding(
RelayShards(clusterId: conf.clusterId, shardIds: conf.shards)
).isOkOr:
error "failed to add sharded topics to ENR", error = error
return err("failed to add sharded topics to ENR: " & $error)

let recordRes = builder.build()
let record =
Expand Down Expand Up @@ -561,11 +563,14 @@ when isMainModule:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()

conf.bootstrapNodes = twnClusterConf.discv5BootstrapNodes
conf.pubsubTopics = twnClusterConf.pubsubTopics
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
conf.numShardsInNetwork = twnClusterConf.numShardsInNetwork

if conf.shards.len == 0:
conf.shards = toSeq(uint16(0) .. uint16(twnClusterConf.numShardsInNetwork - 1))

if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
Expand Down Expand Up @@ -631,9 +636,11 @@ when isMainModule:
error "failed to mount waku metadata protocol: ", err = error
quit 1

for pubsubTopic in conf.pubsubTopics:
# Subscribe the node to the default pubsubtopic, to count messages
subscribeAndHandleMessages(node, pubsubTopic, msgPerContentTopic)
for shard in conf.shards:
# Subscribe the node to the shards, to count messages
subscribeAndHandleMessages(
node, $RelayShard(shardId: shard, clusterId: conf.clusterId), msgPerContentTopic
)

# spawn the routine that crawls the network
# TODO: split into 3 routines (discovery, connections, ip2location)
Expand Down
15 changes: 10 additions & 5 deletions apps/networkmonitor/networkmonitor_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ type NetworkMonitorConf* = object
name: "dns-discovery-url"
.}: string

pubsubTopics* {.
desc: "Default pubsub topic to subscribe to. Argument may be repeated.",
name: "pubsub-topic"
.}: seq[string]
shards* {.
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
name: "shard"
.}: seq[uint16]

numShardsInNetwork* {.
desc: "Number of shards in the network", name: "num-shards-in-network"
.}: uint32

refreshInterval* {.
desc: "How often new peers are discovered and connected to (in seconds)",
Expand All @@ -55,7 +60,7 @@ type NetworkMonitorConf* = object
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 1,
name: "cluster-id"
.}: uint32
.}: uint16

rlnRelay* {.
desc: "Enable spam protection through rln-relay: true|false",
Expand Down
3 changes: 2 additions & 1 deletion apps/wakucanary/wakucanary.nim
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ type WakuCanaryConf* = object
.}: bool

shards* {.
desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
defaultValue: @[],
name: "shard",
abbr: "s"
Expand Down
2 changes: 1 addition & 1 deletion docs/operators/how-to/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ By default a nwaku node will:
See [this tutorial](./configure-key.md) if you want to generate and configure a persistent private key.
- listen for incoming libp2p connections on the default TCP port (`60000`)
- enable `relay` protocol
- subscribe to the default pubsub topic, namely `/waku/2/rs/0/0`
- subscribe to the default clusterId (0) and shard (0)
- enable `store` protocol, but only as a client.
This implies that the nwaku node will not persist any historical messages itself,
but can query `store` service peers who do so.
Expand Down
16 changes: 6 additions & 10 deletions tests/node/peer_manager/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,22 @@ suite "Peer Manager":
serverKey {.threadvar.}: PrivateKey
clientKey {.threadvar.}: PrivateKey
clusterId {.threadvar.}: uint64
shardTopic0 {.threadvar.}: string
shardTopic1 {.threadvar.}: string

asyncSetup:
listenPort = Port(0)
listenAddress = ValidIpAddress.init("0.0.0.0")
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
clusterId = 1
shardTopic0 = "/waku/2/rs/" & $clusterId & "/0"
shardTopic1 = "/waku/2/rs/" & $clusterId & "/1"

asyncTest "light client is not disconnected":
# Given two nodes with the same shardId
let
server = newTestWakuNode(
serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0]
)
client = newTestWakuNode(
clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1]
clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1]
)

# And both mount metadata and filter
Expand Down Expand Up @@ -71,10 +67,10 @@ suite "Peer Manager":
# Given two nodes with the same shardId
let
server = newTestWakuNode(
serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0]
)
client = newTestWakuNode(
clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1]
)

# And both mount metadata and relay
Expand Down Expand Up @@ -104,10 +100,10 @@ suite "Peer Manager":
# Given two nodes with different shardIds
let
server = newTestWakuNode(
serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0]
)
client = newTestWakuNode(
clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1]
clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1]
)

# And both mount metadata and relay
Expand Down
6 changes: 3 additions & 3 deletions tests/node/test_wakunode_relay_rln.nim
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ proc getWakuRlnConfigOnChain*(
)

proc setupRelayWithOnChainRln*(
node: WakuNode, pubsubTopics: seq[string], wakuRlnConfig: WakuRlnConfig
node: WakuNode, shards: seq[RelayShard], wakuRlnConfig: WakuRlnConfig
) {.async.} =
await node.mountRelay(pubsubTopics)
await node.mountRelay(shards)
await node.mountRlnRelay(wakuRlnConfig)

suite "Waku RlnRelay - End to End - Static":
Expand Down Expand Up @@ -223,7 +223,7 @@ suite "Waku RlnRelay - End to End - Static":
nodekey = generateSecp256k1Key()
node = newTestWakuNode(nodekey, parseIpAddress("0.0.0.0"), Port(0))

await node.mountRelay(@[DefaultPubsubTopic])
await node.mountRelay(@[DefaultRelayShard])

let contractAddress = await uploadRLNContract(EthClient)
let wakuRlnConfig = WakuRlnConfig(
Expand Down
9 changes: 6 additions & 3 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -418,21 +418,24 @@ procSuite "Peer Manager":
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
port,
pubsubTopics = @["/waku/2/rs/3/0"],
clusterId = 3,
shards = @[uint16(0)],
)

# same network
node2 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
port,
pubsubTopics = @["/waku/2/rs/4/0"],
clusterId = 4,
shards = @[uint16(0)],
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
port,
pubsubTopics = @["/waku/2/rs/4/0"],
clusterId = 4,
shards = @[uint16(0)],
)

node1.mountMetadata(3).expect("Mounted Waku Metadata")
Expand Down
10 changes: 5 additions & 5 deletions tests/test_relay_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ procSuite "Relay (GossipSub) Peer Exchange":
newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true)

# When both client and server mount relay without a handler
await node1.mountRelay(@[DefaultPubsubTopic])
await node2.mountRelay(@[DefaultPubsubTopic], none(RoutingRecordsHandler))
await node1.mountRelay(@[DefaultRelayShard])
await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler))

# Then the relays are mounted without a handler
check:
Expand Down Expand Up @@ -75,9 +75,9 @@ procSuite "Relay (GossipSub) Peer Exchange":
peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler

# Givem the nodes mount relay with a peer exchange handler
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandle))
await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))
await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))
await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle))

# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1
Expand Down
2 changes: 1 addition & 1 deletion tests/test_waku_enr.nim
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ suite "Waku ENR - Relay static sharding":
clusterId: uint16 = 22
shardId: uint16 = 1

let shard = RelayShard.staticSharding(clusterId, shardId)
let shard = RelayShard(clusterId: clusterId, shardId: shardId)

## When
let shardsTopics = RelayShards.init(clusterId, shardId).expect("Valid Shards")
Expand Down
12 changes: 6 additions & 6 deletions tests/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ suite "WakuNode":
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(61000))
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61002))
pubSubTopic = "/waku/2/rs/0/0"
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)

# Setup node 1 with stable codec "/vac/waku/relay/2.0.0"

await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
node1.wakuRelay.codec = "/vac/waku/relay/2.0.0"

# Setup node 2 with beta codec "/vac/waku/relay/2.0.0-beta2"

await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
node2.wakuRelay.codec = "/vac/waku/relay/2.0.0-beta2"

check:
Expand All @@ -58,15 +58,15 @@ suite "WakuNode":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)

node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(2000.millis)

var res = await node1.publish(some(pubSubTopic), message)
var res = await node1.publish(some($shard), message)
assert res.isOk(), $res.error

await sleepAsync(2000.millis)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ suite "WakuNode - Lightpush":

await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())

await destNode.mountRelay(@[DefaultPubsubTopic])
await bridgeNode.mountRelay(@[DefaultPubsubTopic])
await destNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountLightPush()
lightNode.mountLightPushClient()

Expand Down
Loading

0 comments on commit a3cd2a1

Please sign in to comment.