diff --git a/examples/v2/README.md b/examples/v2/README.md index 9032dcb8e8..1537d20942 100644 --- a/examples/v2/README.md +++ b/examples/v2/README.md @@ -4,7 +4,7 @@ TODO # publisher/subscriber -Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publises messages to the default pubsub topic to a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives. +Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publishes messages to the default pubsub topic on a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives. **Some notes:** * These examples are meant to work even in if you are behind a firewall and you can't be discovered by discv5. @@ -31,4 +31,34 @@ And run a publisher ./build/publisher ``` -See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations. \ No newline at end of file +See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations. + +# resource-restricted publisher/subscriber (lightpush/filter) + +To illustrate publishing and receiving messages on a resource-restricted client, +`examples/v2` also provides a `lightpush_publisher` and a `filter_subscriber`. +The `lightpush_publisher` continually publishes messages via a lightpush service node +to the default pubsub topic on a given content topic. +The `filter_subscriber` subscribes via a filter service node +to the same pubsub and content topic. +It runs forever, maintaining this subscription +and printing the content it receives. + +**compile and run:** + +Wait until the filter subscriber is ready. +```console +./env.sh bash +nim c -r examples/v2/filter_subscriber.nim +``` + +And run a lightpush publisher +```console +./env.sh bash +nim c -r examples/v2/lightpush_publisher.nim +``` + +See how the filter subscriber receives messages published by the lightpush publisher. +Neither the publisher nor the subscriber participates in `relay`, +but instead make use of service nodes to save resources. +Feel free to experiment from different machines in different locations. diff --git a/examples/v2/filter_subscriber.nim b/examples/v2/filter_subscriber.nim new file mode 100644 index 0000000000..d89651ec7d --- /dev/null +++ b/examples/v2/filter_subscriber.nim @@ -0,0 +1,82 @@ +## Example showing how a resource restricted client may +## subscribe to messages without relay + +import + chronicles, + chronos, + stew/byteutils, + stew/results +import + ../../../waku/common/logging, + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/waku_core, + ../../../waku/v2/waku_filter_v2/client + +const + FilterPeer = "/ip4/104.154.239.128/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS" # node-01.gc-us-central1-a.wakuv2.test.statusim.net on wakuv2.test + FilterPubsubTopic = PubsubTopic("/waku/2/default-waku/proto") + FilterContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto") + +proc unsubscribe(wfc: WakuFilterClient, + filterPeer: RemotePeerInfo, + filterPubsubTopic: PubsubTopic, + filterContentTopic: ContentTopic) {.async.} = + notice "unsubscribing from filter" + let unsubscribeRes = await wfc.unsubscribe(filterPeer, filterPubsubTopic, @[filterContentTopic]) + if unsubscribeRes.isErr: + notice "unsubscribe request failed", err=unsubscribeRes.error + else: + notice "unsubscribe request successful" + +proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) = + let payloadStr = string.fromBytes(message.payload) + notice "message received", payload=payloadStr, + pubsubTopic=pubsubTopic, + contentTopic=message.contentTopic, + timestamp=message.timestamp + +proc maintainSubscription(wfc: WakuFilterClient, + filterPeer: RemotePeerInfo, + filterPubsubTopic: PubsubTopic, + filterContentTopic: ContentTopic) {.async.} = + while true: + notice "maintaining subscription" + # First use filter-ping to check if we have an active subscription + let pingRes = await wfc.ping(filterPeer) + if pingRes.isErr: + # No subscription found. Let's subscribe. + notice "no subscription found. Sending subscribe request" + + let subscribeRes = await wfc.subscribe(filterPeer, filterPubsubTopic, @[filterContentTopic]) + + if subscribeRes.isErr: + notice "subscribe request failed. Quitting.", err=subscribeRes.error + break + else: + notice "subscribe request successful." + else: + notice "subscription found." + + await sleepAsync(60.seconds) # Subscription maintenance interval + +proc setupAndSubscribe(rng: ref HmacDrbgContext) = + let filterPeer = parsePeerInfo(FilterPeer).get() + + setupLogLevel(logging.LogLevel.NOTICE) + notice "starting filter subscriber" + + var + switch = newStandardSwitch() + pm = PeerManager.new(switch) + wfc = WakuFilterClient.new(rng, messagePushHandler, pm) + + # Mount filter client protocol + switch.mount(wfc) + + # Start maintaining subscription + asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic) + +when isMainModule: + let rng = newRng() + setupAndSubscribe(rng) + runForever() diff --git a/examples/v2/lightpush_publisher.nim b/examples/v2/lightpush_publisher.nim new file mode 100644 index 0000000000..bcb753e517 --- /dev/null +++ b/examples/v2/lightpush_publisher.nim @@ -0,0 +1,57 @@ +## Example showing how a resource restricted client may +## use lightpush to publish messages without relay + +import + chronicles, + chronos, + stew/byteutils, + stew/results +import + ../../../waku/common/logging, + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/waku_core, + ../../../waku/v2/waku_lightpush/client + +const + LightpushPeer = "/ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ" # node-01.do-ams3.wakuv2.test.statusim.net on wakuv2.test + LightpushPubsubTopic = PubsubTopic("/waku/2/default-waku/proto") + LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto") + +proc publishMessages(wlc: WakuLightpushClient, + lightpushPeer: RemotePeerInfo, + lightpushPubsubTopic: PubsubTopic, + lightpushContentTopic: ContentTopic) {.async.} = + while true: + let text = "hi there i'm a lightpush publisher" + let message = WakuMessage(payload: toBytes(text), # content of the message + contentTopic: lightpushContentTopic, # content topic to publish to + ephemeral: true, # tell store nodes to not store it + timestamp: getNowInNanosecondTime()) # current timestamp + + let wlpRes = await wlc.publish(lightpushPubsubTopic, message, lightpushPeer) + + if wlpRes.isOk(): + notice "published message using lightpush", message=message + else: + notice "failed to publish message using lightpush", err=wlpRes.error() + + await sleepAsync(5000) # Publish every 5 seconds + +proc setupAndPublish(rng: ref HmacDrbgContext) = + let lightpushPeer = parsePeerInfo(LightpushPeer).get() + + setupLogLevel(logging.LogLevel.NOTICE) + notice "starting lightpush publisher" + + var + switch = newStandardSwitch() + pm = PeerManager.new(switch) + wlc = WakuLightpushClient.new(pm, rng) + + # Start maintaining subscription + asyncSpawn publishMessages(wlc, lightpushPeer, LightpushPubsubTopic, LightpushContentTopic) + +when isMainModule: + let rng = newRng() + setupAndPublish(rng) + runForever()