-
Notifications
You must be signed in to change notification settings - Fork 57
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
RestApi Lightpush endpoint implemented * Openapi definition for lightpush rest api * Update waku/node/rest/lightpush/handlers.nim * Fix install handler naming, added negative test cases and fixes for restapi lightpush * Fix error handling in lightpush rest handler * Fix main success case - relay message that comes with pushRequest * Fix rest relay serdes test with RelayWakuMessage validation changes * Checking response message tests
- Loading branch information
1 parent
dc25057
commit 02a814b
Showing
11 changed files
with
502 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
{.used.} | ||
|
||
import | ||
std/sequtils, | ||
stew/byteutils, | ||
stew/shims/net, | ||
testutils/unittests, | ||
presto, presto/client as presto_client, | ||
libp2p/crypto/crypto | ||
|
||
import | ||
../../waku/node/message_cache, | ||
../../waku/common/base64, | ||
../../waku/waku_core, | ||
../../waku/waku_node, | ||
../../waku/node/peer_manager, | ||
../../waku/waku_lightpush, | ||
../../waku/node/rest/server, | ||
../../waku/node/rest/client, | ||
../../waku/node/rest/responses, | ||
../../waku/node/rest/lightpush/types, | ||
../../waku/node/rest/lightpush/handlers as lightpush_api, | ||
../../waku/node/rest/lightpush/client as lightpush_api_client, | ||
../../waku/waku_relay, | ||
../testlib/wakucore, | ||
../testlib/wakunode | ||
|
||
|
||
proc testWakuNode(): WakuNode = | ||
let | ||
privkey = generateSecp256k1Key() | ||
bindIp = ValidIpAddress.init("0.0.0.0") | ||
extIp = ValidIpAddress.init("127.0.0.1") | ||
port = Port(0) | ||
|
||
return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) | ||
|
||
|
||
type RestLightPushTest = object | ||
serviceNode: WakuNode | ||
pushNode: WakuNode | ||
consumerNode: WakuNode | ||
restServer: RestServerRef | ||
client: RestClientRef | ||
|
||
|
||
proc init(T: type RestLightPushTest): Future[T] {.async.} = | ||
var testSetup = RestLightPushTest() | ||
testSetup.serviceNode = testWakuNode() | ||
testSetup.pushNode = testWakuNode() | ||
testSetup.consumerNode = testWakuNode() | ||
|
||
await allFutures(testSetup.serviceNode.start(), | ||
testSetup.pushNode.start(), | ||
testSetup.consumerNode.start()) | ||
|
||
await testSetup.consumerNode.mountRelay() | ||
await testSetup.serviceNode.mountRelay() | ||
await testSetup.serviceNode.mountLightPush() | ||
testSetup.pushNode.mountLightPushClient() | ||
|
||
|
||
testSetup.serviceNode.peerManager.addServicePeer( | ||
testSetup.consumerNode.peerInfo.toRemotePeerInfo(), | ||
WakuRelayCodec) | ||
|
||
await testSetup.serviceNode.connectToNodes(@[testSetup.consumerNode.peerInfo.toRemotePeerInfo()]) | ||
|
||
testSetup.pushNode.peerManager.addServicePeer( | ||
testSetup.serviceNode.peerInfo.toRemotePeerInfo(), | ||
WakuLightPushCodec) | ||
|
||
let restPort = Port(58011) | ||
let restAddress = ValidIpAddress.init("127.0.0.1") | ||
testSetup.restServer = RestServerRef.init(restAddress, restPort).tryGet() | ||
|
||
installLightPushRequestHandler(testSetup.restServer.router, testSetup.pushNode) | ||
|
||
testSetup.restServer.start() | ||
|
||
testSetup.client = newRestHttpClient(initTAddress(restAddress, restPort)) | ||
|
||
return testSetup | ||
|
||
|
||
proc shutdown(self: RestLightPushTest) {.async.} = | ||
await self.restServer.stop() | ||
await self.restServer.closeWait() | ||
await allFutures(self.serviceNode.stop(), self.pushNode.stop()) | ||
|
||
|
||
suite "Waku v2 Rest API - lightpush": | ||
asyncTest "Push message request": | ||
# Given | ||
let restLightPushTest = await RestLightPushTest.init() | ||
|
||
restLightPushTest.consumerNode.subscribe(DefaultPubsubTopic) | ||
restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic) | ||
require: | ||
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 | ||
|
||
# When | ||
let message : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic, | ||
payload = toBytes("TEST-1")).toRelayWakuMessage() | ||
|
||
let requestBody = PushRequest(pubsubTopic: some(DefaultPubsubTopic), | ||
message: message) | ||
let response = await restLightPushTest.client.sendPushRequest(requestBody) | ||
|
||
echo "response", $response | ||
|
||
# Then | ||
check: | ||
response.status == 200 | ||
$response.contentType == $MIMETYPE_TEXT | ||
|
||
await restLightPushTest.shutdown() | ||
|
||
asyncTest "Push message bad-request": | ||
# Given | ||
let restLightPushTest = await RestLightPushTest.init() | ||
|
||
restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic) | ||
require: | ||
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 | ||
|
||
# When | ||
let badMessage1 : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic, | ||
payload = toBytes("")).toRelayWakuMessage() | ||
let badRequestBody1 = PushRequest(pubsubTopic: some(DefaultPubsubTopic), | ||
message: badMessage1) | ||
|
||
let badMessage2 : RelayWakuMessage = fakeWakuMessage(contentTopic = "", | ||
payload = toBytes("Sthg")).toRelayWakuMessage() | ||
let badRequestBody2 = PushRequest(pubsubTopic: some(DefaultPubsubTopic), | ||
message: badMessage2) | ||
|
||
let badRequestBody3 = PushRequest(pubsubTopic: none(PubsubTopic), | ||
message: badMessage2) | ||
|
||
var response: RestResponse[string] | ||
|
||
response = await restLightPushTest.client.sendPushRequest(badRequestBody1) | ||
|
||
echo "response", $response | ||
|
||
# Then | ||
check: | ||
response.status == 400 | ||
$response.contentType == $MIMETYPE_TEXT | ||
response.data.startsWith("Invalid content body") | ||
|
||
|
||
# when | ||
response = await restLightPushTest.client.sendPushRequest(badRequestBody2) | ||
|
||
# Then | ||
check: | ||
response.status == 400 | ||
$response.contentType == $MIMETYPE_TEXT | ||
response.data.startsWith("Invalid content body") | ||
|
||
# when | ||
response = await restLightPushTest.client.sendPushRequest(badRequestBody3) | ||
|
||
# Then | ||
check: | ||
response.status == 400 | ||
$response.contentType == $MIMETYPE_TEXT | ||
response.data.startsWith("Invalid content body") | ||
|
||
await restLightPushTest.shutdown() | ||
|
||
asyncTest "Push message request service not available": | ||
# Given | ||
let restLightPushTest = await RestLightPushTest.init() | ||
|
||
# When | ||
let message : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic, | ||
payload = toBytes("TEST-1")).toRelayWakuMessage() | ||
|
||
let requestBody = PushRequest(pubsubTopic: some("NoExistTopic"), | ||
message: message) | ||
let response = await restLightPushTest.client.sendPushRequest(requestBody) | ||
|
||
echo "response", $response | ||
|
||
# Then | ||
check: | ||
response.status == 503 | ||
$response.contentType == $MIMETYPE_TEXT | ||
response.data == "Failed to request a message push: Can not publish to any peers" | ||
|
||
await restLightPushTest.shutdown() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
when (NimMajor, NimMinor) < (1, 4): | ||
{.push raises: [Defect].} | ||
else: | ||
{.push raises: [].} | ||
|
||
import | ||
json, | ||
std/sets, | ||
stew/byteutils, | ||
strformat, | ||
chronicles, | ||
json_serialization, | ||
json_serialization/std/options, | ||
presto/[route, client, common] | ||
import | ||
../../../waku_core, | ||
../serdes, | ||
../responses, | ||
./types | ||
|
||
export types | ||
|
||
logScope: | ||
topics = "waku node rest client v2" | ||
|
||
proc encodeBytes*(value: PushRequest, | ||
contentType: string): RestResult[seq[byte]] = | ||
if MediaType.init(contentType) != MIMETYPE_JSON: | ||
error "Unsupported contentType value", contentType = contentType | ||
return err("Unsupported contentType") | ||
|
||
let encoded = ?encodeIntoJsonBytes(value) | ||
return ok(encoded) | ||
|
||
proc decodeBytes*(t: typedesc[string], value: openarray[byte], | ||
contentType: Opt[ContentTypeData]): RestResult[string] = | ||
if MediaType.init($contentType) != MIMETYPE_TEXT: | ||
error "Unsupported contentType value", contentType = contentType | ||
return err("Unsupported contentType") | ||
|
||
var res: string | ||
if len(value) > 0: | ||
res = newString(len(value)) | ||
copyMem(addr res[0], unsafeAddr value[0], len(value)) | ||
return ok(res) | ||
|
||
proc sendPushRequest*(body: PushRequest): | ||
RestResponse[string] | ||
{.rest, endpoint: "/lightpush/v1/message", meth: HttpMethod.MethodPost.} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
when (NimMajor, NimMinor) < (1, 4): | ||
{.push raises: [Defect].} | ||
else: | ||
{.push raises: [].} | ||
|
||
import | ||
std/strformat, | ||
std/sequtils, | ||
stew/byteutils, | ||
chronicles, | ||
json_serialization, | ||
json_serialization/std/options, | ||
presto/route, | ||
presto/common | ||
|
||
import | ||
../../../waku_core, | ||
../../peer_manager, | ||
../../waku_node, | ||
../../waku/waku_lightpush, | ||
../serdes, | ||
../responses, | ||
./types | ||
|
||
export types | ||
|
||
logScope: | ||
topics = "waku node rest lightpush api" | ||
|
||
const futTimeoutForPushRequestProcessing* = 5.seconds | ||
|
||
#### Request handlers | ||
|
||
const ROUTE_LIGHTPUSH* = "/lightpush/v1/message" | ||
|
||
func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = | ||
if contentBody.isNone(): | ||
return err(RestApiResponse.badRequest("Missing content body")) | ||
|
||
let reqBodyContentType = MediaType.init($contentBody.get().contentType) | ||
if reqBodyContentType != MIMETYPE_JSON: | ||
return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) | ||
|
||
let reqBodyData = contentBody.get().data | ||
|
||
let requestResult = decodeFromJsonBytes(T, reqBodyData) | ||
if requestResult.isErr(): | ||
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & | ||
$requestResult.error)) | ||
|
||
return ok(requestResult.get()) | ||
|
||
proc installLightPushRequestHandler*(router: var RestRouter, | ||
node: WakuNode) = | ||
|
||
router.api(MethodPost, ROUTE_LIGHTPUSH) do (contentBody: Option[ContentBody]) -> RestApiResponse: | ||
## Send a request to push a waku message | ||
debug "post", ROUTE_LIGHTPUSH, contentBody | ||
|
||
let decodedBody = decodeRequestBody[PushRequest](contentBody) | ||
|
||
if decodedBody.isErr(): | ||
return decodedBody.error() | ||
|
||
let req: PushRequest = decodedBody.value() | ||
let msg = req.message.toWakuMessage() | ||
|
||
if msg.isErr(): | ||
return RestApiResponse.badRequest("Invalid message: {msg.error}") | ||
|
||
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) | ||
if peerOpt.isNone(): | ||
return RestApiResponse.serviceUnavailable("No suitable remote lightpush peers") | ||
|
||
let subFut = node.lightpushPublish(req.pubsubTopic, | ||
msg.value(), | ||
peerOpt.get()) | ||
|
||
if not await subFut.withTimeout(futTimeoutForPushRequestProcessing): | ||
error "Failed to request a message push due to timeout!" | ||
return RestApiResponse.serviceUnavailable("Push request timed out") | ||
|
||
if subFut.value().isErr(): | ||
return RestApiResponse.serviceUnavailable(fmt("Failed to request a message push: {subFut.value().error}")) | ||
|
||
return RestApiResponse.ok() |
Oops, something went wrong.