Skip to content

Commit

Permalink
Refactor & Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Sep 7, 2023
1 parent 749f4aa commit d0c82ba
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 270 deletions.
4 changes: 2 additions & 2 deletions apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =

chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])

await cmb.nodev2.publish(DefaultPubsubTopic, msg)
await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)

proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()):
Expand Down Expand Up @@ -203,7 +203,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
trace "Bridging message from Chat2 to Matterbridge", msg=msg
cmb.toMatterbridge(msg)

cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler)
cmb.nodev2.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: DefaultPubsubTopic), relayHandler)

proc stop*(cmb: Chat2MatterBridge) {.async.} =
info "Stopping Chat2MatterBridge"
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now()) # current timestamp
await node.publish(pubSubTopic, message)
await node.publish(some(pubSubTopic), message)
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
await sleepAsync(5000)

Expand Down
2 changes: 1 addition & 1 deletion examples/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
pubsubTopic=pubsubTopic,
contentTopic=msg.contentTopic,
timestamp=msg.timestamp
node.subscribe(pubSubTopic, handler)
node.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), handler)

when isMainModule:
let rng = crypto.newRng()
Expand Down
6 changes: 3 additions & 3 deletions tests/test_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ procSuite "Peer Exchange":
peerExchangeHandler = handlePeerExchange
emptyHandler = ignorePeerExchange

await node1.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler))
await node2.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler))
await node3.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(peerExchangeHandler))
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandler))

# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1
Expand Down
4 changes: 2 additions & 2 deletions tests/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ suite "WakuNode":
msg.payload == payload
completionFut.complete(true)

node2.subscribe(pubSubTopic, relayHandler)
node2.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), relayHandler)
await sleepAsync(2000.millis)

await node1.publish(pubSubTopic, message)
await node1.publish(some(pubSubTopic), message)
await sleepAsync(2000.millis)

check:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ suite "WakuNode - Lightpush":
topic == DefaultPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe(DefaultPubsubTopic, relayHandler)
destNode.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: DefaultPubsubTopic), relayHandler)

# Wait for subscription to take effect
await sleepAsync(100.millis)
Expand Down
91 changes: 32 additions & 59 deletions tests/waku_relay/test_wakunode_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)

node3.subscribe(pubSubTopic, relayHandler)
node3.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), relayHandler)
await sleepAsync(500.millis)

await node1.publish(pubSubTopic, message)
await node1.publish(some(pubSubTopic), message)

## Then
check:
Expand Down Expand Up @@ -187,14 +187,14 @@ suite "WakuNode - Relay":
# relay handler is called
completionFut.complete(true)

node3.subscribe(pubSubTopic, relayHandler)
node3.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), relayHandler)
await sleepAsync(500.millis)

await node1.publish(pubSubTopic, message1)
await node1.publish(some(pubSubTopic), message1)
await sleepAsync(500.millis)

# message2 never gets relayed because of the validator
await node1.publish(pubSubTopic, message2)
await node1.publish(some(pubSubTopic), message2)
await sleepAsync(500.millis)

check:
Expand All @@ -221,7 +221,7 @@ suite "WakuNode - Relay":
connOk == true

# Node 1 subscribes to topic
nodes[1].subscribe(DefaultPubsubTopic)
nodes[1].subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: DefaultPubsubTopic))
await sleepAsync(500.millis)

# Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes)
Expand Down Expand Up @@ -268,10 +268,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), relayHandler)
await sleepAsync(500.millis)

await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)


Expand Down Expand Up @@ -309,10 +309,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), relayHandler)
await sleepAsync(500.millis)

await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)


Expand Down Expand Up @@ -354,10 +354,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), relayHandler)
await sleepAsync(500.millis)

await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)

check:
Expand Down Expand Up @@ -394,10 +394,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), relayHandler)
await sleepAsync(500.millis)

await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)

check:
Expand Down Expand Up @@ -434,10 +434,10 @@ suite "WakuNode - Relay":
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
node1.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), relayHandler)
await sleepAsync(500.millis)

await node2.publish(pubSubTopic, message)
await node2.publish(some(pubSubTopic), message)
await sleepAsync(500.millis)


Expand Down Expand Up @@ -497,7 +497,7 @@ suite "WakuNode - Relay":
# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))

asyncTest "Unsubscribe does not remove the subscription if a content topic also uses the shard":
asyncTest "Unsubscribe does not remove the subscription if other content topics also use the shard":
## Setup
let
nodeKey = generateSecp256k1Key()
Expand All @@ -509,57 +509,30 @@ suite "WakuNode - Relay":
## Given
let
shard = "/waku/2/rs/1/1"
topic = DefaultContentTopic
contentTopicA = DefaultContentTopic
contentTopicB = ContentTopic("/waku/2/default-content1/proto")
contentTopicC = ContentTopic("/waku/2/default-content2/proto")
handler = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].} =
discard pubsubTopic
discard message

assert shard == getShard(topic).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(contentTopicA).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(contentTopicB).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(contentTopicC).expect("Valid Topic"), "topic must use the same shard"

## When
node.subscribe(shard)
node.autoSubscribe(topic)
node.subscribe(SubscriptionEvent(kind: ContentSub, contentSub: contentTopicA), handler)
node.subscribe(SubscriptionEvent(kind: ContentSub, contentSub: contentTopicB), handler)
node.subscribe(SubscriptionEvent(kind: ContentSub, contentSub: contentTopicC), handler)

## Then
node.unsubscribe(shard)
node.unsubscribe(SubscriptionEvent(kind: ContentUnsub, contentUnsub: contentTopicB))
check node.wakuRelay.isSubscribed(shard)

node.autoUnsubscribe(topic)
check not node.wakuRelay.isSubscribed(shard)

## Cleanup
await node.stop()

asyncTest "AutoUnsubscribe does not remove the subscription if other content topics also use the shard":
## Setup
let
nodeKey = generateSecp256k1Key()
node = newTestWakuNode(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0))

await node.start()
await node.mountRelay()

## Given
let
shard = "/waku/2/rs/1/1"
topicA = DefaultContentTopic
topicB = "/waku/2/default-content1/proto"
topicC = "/waku/2/default-content2/proto"

assert shard == getShard(topicA).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(topicB).expect("Valid Topic"), "topic must use the same shard"
assert shard == getShard(topicC).expect("Valid Topic"), "topic must use the same shard"

## When
node.autoSubscribe(topicA)
node.autoSubscribe(topicB)
node.autoSubscribe(topicC)

## Then
node.autoUnsubscribe(topicB)
check node.wakuRelay.isSubscribed(shard)

node.autoUnsubscribe(topicA)
node.unsubscribe(SubscriptionEvent(kind: ContentUnsub, contentUnsub: contentTopicA))
check node.wakuRelay.isSubscribed(shard)

node.autoUnsubscribe(topicC)
node.unsubscribe(SubscriptionEvent(kind: ContentUnsub, contentUnsub: contentTopicC))
check not node.wakuRelay.isSubscribed(shard)

## Cleanup
Expand Down
8 changes: 4 additions & 4 deletions tests/wakunode_jsonrpc/test_jsonrpc_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ suite "Waku v2 JSON-RPC API - Relay":
let node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))

await node.start()
await node.mountRelay(topics = @[DefaultPubsubTopic])
await node.mountRelay(@[DefaultPubsubTopic])

# JSON-RPC server
let
Expand Down Expand Up @@ -123,7 +123,7 @@ suite "Waku v2 JSON-RPC API - Relay":
proc dstHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
dstHandlerFut.complete((topic, msg))

dstNode.subscribe(pubSubTopic, dstHandler)
dstNode.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: pubsubTopic), dstHandler)

## When
let rpcMessage = WakuMessageRPC(
Expand Down Expand Up @@ -192,7 +192,7 @@ suite "Waku v2 JSON-RPC API - Relay":

## When
for msg in messages:
await srcNode.publish(pubSubTopic, msg)
await srcNode.publish(some(pubSubTopic), msg)

await sleepAsync(200.millis)

Expand Down Expand Up @@ -252,7 +252,7 @@ suite "Waku v2 JSON-RPC API - Relay":

## When
for msg in messages:
await srcNode.autoPublish(msg)
await srcNode.publish(none(PubsubTopic), msg)

await sleepAsync(200.millis)

Expand Down
2 changes: 1 addition & 1 deletion tests/wakunode_rest/test_rest_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ suite "Waku v2 Rest API - Relay":

let client = newRestHttpClient(initTAddress(restAddress, restPort))

node.subscribe(DefaultPubsubTopic)
node.subscribe(SubscriptionEvent(kind: PubsubSub, pubsubSub: DefaultPubsubTopic))
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1

Expand Down
Loading

0 comments on commit d0c82ba

Please sign in to comment.