Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Autosharding API for relay subscriptions #1983

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ proc publish(c: Chat, line: string) =
# Attempt lightpush
asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message)
else:
asyncSpawn c.node.publish(DefaultPubsubTopic, message)
asyncSpawn c.node.publish(some(DefaultPubsubTopic), message)

# TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} =
Expand Down Expand Up @@ -490,8 +490,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if msg.contentTopic == chat.contentTopic:
chat.printReceivedMessage(msg)

let topic = DefaultPubsubTopic
node.subscribe(topic, handler)
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))

if conf.rlnRelay:
info "WakuRLNRelay is enabled"
Expand Down
8 changes: 4 additions & 4 deletions apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =

chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])

await cmb.nodev2.publish(DefaultPubsubTopic, msg)
await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)

proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()):
Expand Down Expand Up @@ -204,7 +204,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
trace "Bridging message from Chat2 to Matterbridge", msg=msg
cmb.toMatterbridge(msg)

cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler)
cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))

proc stop*(cmb: Chat2MatterBridge) {.async.} =
info "Stopping Chat2MatterBridge"
Expand All @@ -229,8 +229,8 @@ when isMainModule:

# Install enabled API handlers:
if conf.relay:
let topicCache = relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(node, rpcServer, topicCache)
let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(node, rpcServer, cache)

if conf.filter:
let messageCache = filter_api.MessageCache.init(capacity=30)
Expand Down
2 changes: 1 addition & 1 deletion apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ proc subscribeAndHandleMessages(node: WakuNode,
else:
msgPerContentTopic[msg.contentTopic] = 1

node.subscribe(pubsubTopic, handler)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

when isMainModule:
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
Expand Down
63 changes: 43 additions & 20 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,11 @@ import
../../waku/node/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/waku_archive,
../../waku/waku_dnsdisc,
../../waku/waku_enr,
../../waku/waku_discv5,
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,
./internal_config,
./external_config
import
../../waku/waku_api/message_cache,
../../waku/waku_api/cache_handlers,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
../../waku/waku_api/rest/relay/topic_cache,
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
Expand All @@ -56,7 +42,20 @@ import
../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api,
../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api,
../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api,
../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api
../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api,
../../waku/waku_archive,
../../waku/waku_dnsdisc,
../../waku/waku_enr,
../../waku/waku_discv5,
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,
./internal_config,
./external_config

logScope:
topics = "wakunode app"
Expand Down Expand Up @@ -576,8 +575,20 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo

## Relay REST API
if conf.relay:
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, app.node, relayCache)
let cache = MessageCache[string].init(capacity=conf.restRelayCacheCapacity)

let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)

for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))

installRelayApiHandlers(server.router, app.node, cache)

## Filter REST API
if conf.filter:
Expand Down Expand Up @@ -610,8 +621,20 @@ proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNod
installDebugApiHandlers(app.node, server)

if conf.relay:
let relayMessageCache = rpc_relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(app.node, server, relayMessageCache)
let cache = MessageCache[string].init(capacity=30)

let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)

for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))

installRelayApiHandlers(app.node, server, cache)

if conf.filternode != "":
let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30)
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now()) # current timestamp
await node.publish(pubSubTopic, message)
await node.publish(some(pubSubTopic), message)
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
await sleepAsync(5000)

Expand Down
2 changes: 1 addition & 1 deletion examples/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
pubsubTopic=pubsubTopic,
contentTopic=msg.contentTopic,
timestamp=msg.timestamp
node.subscribe(pubSubTopic, handler)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

when isMainModule:
let rng = crypto.newRng()
Expand Down
6 changes: 3 additions & 3 deletions tests/test_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ procSuite "Peer Exchange":
peerExchangeHandler = handlePeerExchange
emptyHandler = ignorePeerExchange

await node1.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler))
await node2.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler))
await node3.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(peerExchangeHandler))
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandler))

# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1
Expand Down
18 changes: 9 additions & 9 deletions tests/test_waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,9 @@ procSuite "Waku Discovery v5":
asyncSpawn node.subscriptionsListener(queue)

## Then
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))
queue.emit((kind: PubsubSub, topic: shard1))
queue.emit((kind: PubsubSub, topic: shard2))
queue.emit((kind: PubsubSub, topic: shard3))

await sleepAsync(1.seconds)

Expand All @@ -426,9 +426,9 @@ procSuite "Waku Discovery v5":
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true

queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))
queue.emit((kind: PubsubSub, topic: shard1))
queue.emit((kind: PubsubSub, topic: shard2))
queue.emit((kind: PubsubSub, topic: shard3))

await sleepAsync(1.seconds)

Expand All @@ -437,9 +437,9 @@ procSuite "Waku Discovery v5":
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true

queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard3))
queue.emit((kind: PubsubUnsub, topic: shard1))
queue.emit((kind: PubsubUnsub, topic: shard2))
queue.emit((kind: PubsubUnsub, topic: shard3))

await sleepAsync(1.seconds)

Expand Down
4 changes: 2 additions & 2 deletions tests/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ suite "WakuNode":
msg.payload == payload
completionFut.complete(true)

node2.subscribe(pubSubTopic, relayHandler)
node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)

await node1.publish(pubSubTopic, message)
await node1.publish(some(pubSubTopic), message)
await sleepAsync(2000.millis)

check:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ suite "WakuNode - Lightpush":
topic == DefaultPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe(DefaultPubsubTopic, relayHandler)
destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))

# Wait for subscription to take effect
await sleepAsync(100.millis)
Expand Down
18 changes: 9 additions & 9 deletions tests/waku_relay/test_waku_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ suite "Waku Relay":
networkB = "test-network2"

## when
nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
discard nodeA.subscribe(networkA, noopRawHandler())
discard nodeA.subscribe(networkB, noopRawHandler())

## Then
check:
Expand All @@ -73,9 +73,9 @@ suite "Waku Relay":
networkB = "test-network2"
networkC = "test-network3"

nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
nodeA.subscribe(networkC, noopRawHandler())
discard nodeA.subscribe(networkA, noopRawHandler())
discard nodeA.subscribe(networkB, noopRawHandler())
discard nodeA.subscribe(networkC, noopRawHandler())

let topics = toSeq(nodeA.subscribedTopics)
require:
Expand All @@ -85,7 +85,7 @@ suite "Waku Relay":
topics.contains(networkC)

## When
nodeA.unsubscribe(networkA)
nodeA.unsubscribeAll(networkA)

## Then
check:
Expand Down Expand Up @@ -129,14 +129,14 @@ suite "Waku Relay":
proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
srcSubsFut.complete((topic, message))

srcNode.subscribe(networkTopic, srcSubsHandler)
discard srcNode.subscribe(networkTopic, srcSubsHandler)

# Subscription
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))

dstNode.subscribe(networkTopic, dstSubsHandler)
discard dstNode.subscribe(networkTopic, dstSubsHandler)

await sleepAsync(500.millis)

Expand Down Expand Up @@ -196,7 +196,7 @@ suite "Waku Relay":
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))

dstNode.subscribe(networkTopic, dstSubsHandler)
discard dstNode.subscribe(networkTopic, dstSubsHandler)

await sleepAsync(500.millis)

Expand Down
Loading
Loading