diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index f869de5794..6d6f2cdb34 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -44,6 +44,7 @@ import ../waku_lightpush/client as lightpush_client, ../waku_lightpush/common, ../waku_lightpush/protocol, + ../waku_lightpush/self_req_handler, ../waku_enr, ../waku_peer_exchange, ../waku_rln_relay, @@ -913,7 +914,7 @@ proc mountLightPush*( if publishedCount == 0: ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 - debug("Lightpush request has not been published to any peers") + debug "Lightpush request has not been published to any peers" return ok() @@ -942,15 +943,30 @@ proc lightpushPublish*( ## Returns whether relaying was successful or not. ## `WakuMessage` should contain a `contentTopic` field for light node ## functionality. - if node.wakuLightpushClient.isNil(): - return err("waku lightpush client is nil") + if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): + error "failed to publish message as lightpush not available" + return err("Waku lightpush not available") + + let internalPublish = proc( + node: WakuNode, + pubsubTopic: PubsubTopic, + message: WakuMessage, + peer: RemotePeerInfo, + ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + if not node.wakuLightpushClient.isNil(): + debug "publishing message with lightpush", + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + peer = peer.peerId + return await node.wakuLightpushClient.publish(pubsubTopic, message, peer) + + if not node.wakuLightPush.isNil(): + debug "publishing message with self hosted lightpush", + pubsubTopic = pubsubTopic, contentTopic = message.contentTopic + return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message) if pubsubTopic.isSome(): - debug "publishing message with lightpush", - pubsubTopic = pubsubTopic.get(), - contentTopic = message.contentTopic, - peer = peer.peerId - return await node.wakuLightpushClient.publish(pubsubTopic.get(), message, peer) + return await internalPublish(node, pubsubTopic.get(), message, peer) let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, message.contentTopic) @@ -961,9 +977,7 @@ proc lightpushPublish*( topicMapRes.get() for pubsub, _ in topicMap.pairs: # There's only one pair anyway - debug "publishing message with lightpush", - pubsubTopic = pubsub, contentTopic = message.contentTopic, peer = peer.peerId - return await node.wakuLightpushClient.publish($pubsub, message, peer) + return await internalPublish(node, $pubsub, message, peer) # TODO: Move to application module (e.g., wakunode2.nim) proc lightpushPublish*( @@ -971,16 +985,19 @@ proc lightpushPublish*( ): Future[WakuLightPushResult[void]] {. async, gcsafe, deprecated: "Use 'node.lightpushPublish()' instead" .} = - if node.wakuLightpushClient.isNil(): - let msg = "waku lightpush client is nil" - error "failed to publish message", msg = msg - return err(msg) - - let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) - if peerOpt.isNone(): - let msg = "no suitable remote peers" - error "failed to publish message", msg = msg - return err(msg) + if node.wakuLightpushClient.isNil() and node.wakuLightPush.isNil(): + error "failed to publish message as lightpush not available" + return err("waku lightpush not available") + + var peerOpt: Option[RemotePeerInfo] = none(RemotePeerInfo) + if not node.wakuLightpushClient.isNil(): + peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) + if peerOpt.isNone(): + let msg = "no suitable remote peers" + error "failed to publish message", msg = msg + return err(msg) + elif not node.wakuLightPush.isNil(): + peerOpt = some(RemotePeerInfo.init($node.switch.peerInfo.peerId)) let publishRes = await node.lightpushPublish(pubsubTopic, message, peer = peerOpt.get()) diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim index 9ab709afd3..2c7466f969 100644 --- a/waku/waku_api/rest/builder.nim +++ b/waku/waku_api/rest/builder.nim @@ -177,7 +177,10 @@ proc startRestServerProtocolSupport*( rest_store_legacy_api.installStoreApiHandlers(router, node, storeDiscoHandler) ## Light push API - if conf.lightpushnode != "" and node.wakuLightpushClient != nil: + ## Install it either if lightpushnode (lightpush service node) is configured and client is mounted) + ## or install it to be used with self-hosted lightpush service + if (conf.lightpushnode != "" and node.wakuLightpushClient != nil) or + (conf.lightpush and node.wakuLightPush != nil and node.wakuRelay != nil): let lightDiscoHandler = if wakuDiscv5.isSome(): some(defaultDiscoveryHandler(wakuDiscv5.get(), Lightpush)) diff --git a/waku/waku_api/rest/lightpush/handlers.nim b/waku/waku_api/rest/lightpush/handlers.nim index 3aa52da81d..9a0611c8b2 100644 --- a/waku/waku_api/rest/lightpush/handlers.nim +++ b/waku/waku_api/rest/lightpush/handlers.nim @@ -16,6 +16,7 @@ import ../../waku/node/peer_manager, ../../../waku_node, ../../waku/waku_lightpush/common, + ../../waku/waku_lightpush/self_req_handler, ../../handlers, ../serdes, ../responses, @@ -35,6 +36,9 @@ const NoPeerNoDiscoError = const NoPeerNoneFoundError = RestApiResponse.serviceUnavailable("No suitable service peer & none discovered") +proc useSelfHostedLightPush(node: WakuNode): bool = + return node.wakuLightPush != nil and node.wakuLightPushClient == nil + #### Request handlers const ROUTE_LIGHTPUSH* = "/lightpush/v1/message" @@ -60,15 +64,19 @@ proc installLightPushRequestHandler*( let msg = req.message.toWakuMessage().valueOr: return RestApiResponse.badRequest("Invalid message: " & $error) - let peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr: - let handler = discHandler.valueOr: - return NoPeerNoDiscoError + var peer = RemotePeerInfo.init($node.switch.peerInfo.peerId) + if useSelfHostedLightPush(node): + discard + else: + peer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr: + let handler = discHandler.valueOr: + return NoPeerNoDiscoError - let peerOp = (await handler()).valueOr: - return RestApiResponse.internalServerError("No value in peerOp: " & $error) + let peerOp = (await handler()).valueOr: + return RestApiResponse.internalServerError("No value in peerOp: " & $error) - peerOp.valueOr: - return NoPeerNoneFoundError + peerOp.valueOr: + return NoPeerNoneFoundError let subFut = node.lightpushPublish(req.pubsubTopic, msg, peer) diff --git a/waku/waku_lightpush/self_req_handler.nim b/waku/waku_lightpush/self_req_handler.nim new file mode 100644 index 0000000000..b08a87035d --- /dev/null +++ b/waku/waku_lightpush/self_req_handler.nim @@ -0,0 +1,53 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +## Notice that the REST /lightpush requests normally assume that the node +## is acting as a lightpush-client that will trigger the service provider node +## to relay the message. +## In this module, we allow that a lightpush service node (full node) can be +## triggered directly through the REST /lightpush endpoint. +## The typical use case for that is when using `nwaku-compose`, +## which spawn a full service Waku node +## that could be used also as a lightpush client, helping testing and development. + +import stew/results, chronos, chronicles, std/options, metrics +import + ../waku_core, + ./protocol, + ./common, + ./rpc, + ./rpc_codec, + ./protocol_metrics, + ../utils/requests + +proc handleSelfLightPushRequest*( + self: WakuLightPush, pubSubTopic: PubsubTopic, message: WakuMessage +): Future[WakuLightPushResult[void]] {.async.} = + ## Handles the lightpush requests made by the node to itself. + ## Normally used in REST-lightpush requests + + try: + # provide self peerId as now this node is used directly, thus there is no light client sender peer. + let selfPeerId = self.peerManager.switch.peerInfo.peerId + + let req = PushRequest(pubSubTopic: pubSubTopic, message: message) + let rpc = PushRPC(requestId: generateRequestId(self.rng), request: some(req)) + + let respRpc = await self.handleRequest(selfPeerId, rpc.encode().buffer) + + if respRpc.response.isNone(): + waku_lightpush_errors.inc(labelValues = [emptyResponseBodyFailure]) + return err(emptyResponseBodyFailure) + + let response = respRpc.response.get() + if not response.isSuccess: + if response.info.isSome(): + return err(response.info.get()) + else: + return err("unknown failure") + + return ok() + except Exception: + return err("exception in handleSelfLightPushRequest: " & getCurrentExceptionMsg())