Skip to content

Commit

Permalink
Enabling to use a full node for lightpush via rest api without light …
Browse files Browse the repository at this point in the history
…push client configured
  • Loading branch information
NagyZoltanPeter committed Apr 25, 2024
1 parent 7f8d8e8 commit c418281
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 23 deletions.
52 changes: 37 additions & 15 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import
../waku_lightpush/client as lightpush_client,
../waku_lightpush/common,
../waku_lightpush/protocol,
../waku_lightpush/self_req_handler,
../waku_enr,
../waku_peer_exchange,
../waku_rln_relay,
Expand Down Expand Up @@ -813,7 +814,7 @@ proc mountLightPush*(

if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
debug("Lightpush request has not been published to any peers")
debug "Lightpush request has not been published to any peers"

return ok()

Expand Down Expand Up @@ -842,15 +843,33 @@ proc lightpushPublish*(
## Returns whether relaying was successful or not.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality.
if node.wakuLightpushClient.isNil():
return err("waku lightpush client is nil")
let internalPublish = proc(
node: WakuNode,
pubsubTopic: PubsubTopic,
message: WakuMessage,
peer: RemotePeerInfo,
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
if not node.wakuLightpushClient.isNil():
debug "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
peer = peer.peerId
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)

if not node.wakuLightPush.isNil():
debug "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message)

if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
return err("waku lightpush not available")

if pubsubTopic.isSome():
debug "publishing message with lightpush",
pubsubTopic = pubsubTopic.get(),
contentTopic = message.contentTopic,
peer = peer.peerId
return await node.wakuLightpushClient.publish(pubsubTopic.get(), message, peer)
return await internalPublish(node, pubsubTopic.get(), message, peer)

let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, message.contentTopic)

Expand All @@ -863,24 +882,27 @@ proc lightpushPublish*(
for pubsub, _ in topicMap.pairs: # There's only one pair anyway
debug "publishing message with lightpush",
pubsubTopic = pubsub, contentTopic = message.contentTopic, peer = peer.peerId
return await node.wakuLightpushClient.publish($pubsub, message, peer)
return await internalPublish(node, $pubsub, message, peer)

# TODO: Move to application module (e.g., wakunode2.nim)
proc lightpushPublish*(
node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage
): Future[WakuLightPushResult[void]] {.
async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead"
.} =
if node.wakuLightpushClient.isNil():
let msg = "waku lightpush client is nil"
error "failed to publish message", msg = msg
return err(msg)

let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
let msg = "no suitable remote peers"
error "failed to publish message", msg = msg
return err(msg)
if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil():
error "failed to publish message as lightpush not available"
return err("waku lightpush not available")

var peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo)
if not node.wakuLightpushClient.isNil():
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
let msg = "no suitable remote peers"
error "failed to publish message", msg = msg
return err(msg)
elif not node.wakuLightPush.isNil():
peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId))

let publishRes =
await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get())
Expand Down
5 changes: 4 additions & 1 deletion waku/waku_api/rest/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ proc startRestServerProtocolSupport*(
installStoreApiHandlers(router, node, storeDiscoHandler)

## Light push API
if conf.lightpushnode != "" and node.wakuLightpushClient != nil:
## Install it either if lightpushnode (lightpush service node) is configured and client is mounted)
## or install it to be used with self-hosted lightpush service
if (conf.lightpushnode != "" and node.wakuLightpushClient != nil) or
(conf.lightpush and node.wakuLightPush != nil):
let lightDiscoHandler =
if wakuDiscv5.isSome():
some(defaultDiscoveryHandler(wakuDiscv5.get(), Lightpush))
Expand Down
22 changes: 15 additions & 7 deletions waku/waku_api/rest/lightpush/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import
../../waku/node/peer_manager,
../../../waku_node,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/self_req_handler,
../../handlers,
../serdes,
../responses,
Expand All @@ -35,6 +36,9 @@ const NoPeerNoDiscoError =
const NoPeerNoneFoundError =
RestApiResponse.serviceUnavailable("No suitable service peer & none discovered")

proc useSelfHostedLightPush(node: WakuNode): bool =
return node.wakuLightPush != nil and node.wakuLightPushClient == nil

#### Request handlers

const ROUTE_LIGHTPUSH* = "/lightpush/v1/message"
Expand All @@ -60,15 +64,19 @@ proc installLightPushRequestHandler*(
let msg = req.message.toWakuMessage().valueOr:
return RestApiResponse.badRequest("Invalid message: " & $error)

let peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
let handler = discHandler.valueOr:
return NoPeerNoDiscoError
var peer = RemotePeerInfo.init($node.switch.peerInfo.peerId)
if useSelfHostedLightPush(node):
discard
else:
peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
let handler = discHandler.valueOr:
return NoPeerNoDiscoError

let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError("No value in peerOp: " & $error)
let peerOp = (await handler()).valueOr:
return RestApiResponse.internalServerError("No value in peerOp: " & $error)

peerOp.valueOr:
return NoPeerNoneFoundError
peerOp.valueOr:
return NoPeerNoneFoundError

let subFut = node.lightpushPublish(req.pubsubTopic, msg, peer)

Expand Down
54 changes: 54 additions & 0 deletions waku/waku_lightpush/self_req_handler.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
##
## This file is aimed to attend the requests that come directly
## from the 'self' node. It is expected to attend the store requests that
## come from REST-store endpoint when those requests don't indicate
## any store-peer address.
##
## Notice that the REST-store requests normally assume that the REST
## server is acting as a store-client. In this module, we allow that
## such REST-store node can act as store-server as well by retrieving
## its own stored messages. The typical use case for that is when
## using `nwaku-compose`, which spawn a Waku node connected to a local
## database, and the user is interested in retrieving the messages
## stored by that local store node.
##

import stew/results, chronos, chronicles, std/options, metrics
import
../waku_core,
./protocol,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../utils/requests

proc handleSelfLightPushRequest*(
self: WakuLightPush, pubSubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
## Handles the lightpush requests made by the node to itself.
## Normally used in REST-lightpush requests

try:
# provide self peerId as now this node is used directly, thus there is no light client sender peer.
let selfPeerId = self.peerManager.switch.peerInfo.peerId

let req = PushRequest(pubSubTopic: pubSubTopic, message: message)
let rpc = PushRPC(requestId: generateRequestId(self.rng), request: some(req))

let respRpc = await self.handleRequest(selfPeerId, rpc.encode().buffer)

if respRpc.response.isNone():
waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure])
return err(emptyResponseBodyFailure)

let response = respRpc.response.get()
if not response.isSuccess:
if response.info.isSome():
return err(response.info.get())
else:
return err("unknown failure")

return ok()
except Exception:
return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg())

0 comments on commit c418281

Please sign in to comment.