Skip to content

Commit

Permalink
chore: improve POST /relay/v1/auto/messages/{topic} error handling (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Jan 18, 2024
1 parent 8a9fad2 commit f841454
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 65 deletions.
15 changes: 10 additions & 5 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,16 @@ proc publish(c: Chat, line: string) =
# update the last epoch
c.node.wakuRlnRelay.lastEpoch = proof.epoch

if not c.node.wakuLightPush.isNil():
# Attempt lightpush
asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message)
else:
asyncSpawn c.node.publish(some(DefaultPubsubTopic), message)
try:
if not c.node.wakuLightPush.isNil():
# Attempt lightpush
(waitFor c.node.lightpushPublish(some(DefaultPubsubTopic), message)).isOkOr:
error "failed to publish lightpush message", error = error
else:
(waitFor c.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
error "failed to publish message", error = error
except CatchableError:
error "caught error publishing message: ", error = getCurrentExceptionMsg()

# TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} =
Expand Down
3 changes: 2 additions & 1 deletion apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =

chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])

await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)
(await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)).isOkOr:
error "failed to publish message", error = error

proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()):
Expand Down
10 changes: 8 additions & 2 deletions examples/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,14 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now()) # current timestamp
await node.publish(some(pubSubTopic), message)
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic

let res = await node.publish(some(pubSubTopic), message)

if res.isOk:
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
else:
error "failed to publish message", error = res.error

await sleepAsync(5000)

when isMainModule:
Expand Down
4 changes: 3 additions & 1 deletion tests/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ suite "WakuNode":
node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)

await node1.publish(some(pubSubTopic), message)
var res = await node1.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error

await sleepAsync(2000.millis)

check:
Expand Down
3 changes: 2 additions & 1 deletion tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ suite "WakuNode - Lightpush":
await sleepAsync(100.millis)

## When
await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
assert res.isOk(), $res.error

## Then
check await completionFutRelay.withTimeout(5.seconds)
Expand Down
31 changes: 23 additions & 8 deletions tests/waku_relay/test_wakunode_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ suite "WakuNode - Relay":
node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)

await node1.publish(some(pubSubTopic), message)
var res = await node1.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error

## Then
check:
Expand Down Expand Up @@ -176,11 +177,15 @@ suite "WakuNode - Relay":
node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)

await node1.publish(some(pubSubTopic), message1)
var res = await node1.publish(some(pubSubTopic), message1)
assert res.isOk(), $res.error

await sleepAsync(500.millis)

# message2 never gets relayed because of the validator
await node1.publish(some(pubSubTopic), message2)
res = await node1.publish(some(pubSubTopic), message2)
assert res.isOk(), $res.error

await sleepAsync(500.millis)

check:
Expand Down Expand Up @@ -257,7 +262,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)

await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error

await sleepAsync(500.millis)


Expand Down Expand Up @@ -298,7 +305,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)

await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error

await sleepAsync(500.millis)


Expand Down Expand Up @@ -343,7 +352,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)

await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error

await sleepAsync(500.millis)

check:
Expand Down Expand Up @@ -383,7 +394,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)

await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error

await sleepAsync(500.millis)

check:
Expand Down Expand Up @@ -423,7 +436,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)

await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error

await sleepAsync(500.millis)


Expand Down
22 changes: 11 additions & 11 deletions tests/waku_rln_relay/test_wakunode_rln_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ procSuite "WakuNode - RLN relay":
## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn
## verifies the rate limit proof of the message and relays the message to node3
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
await node1.publish(some(DefaultPubsubTopic), message)
discard await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis)


Expand Down Expand Up @@ -165,8 +165,8 @@ procSuite "WakuNode - RLN relay":

# publish 3 messages from node[0] (last 2 are spam, window is 10 secs)
# publish 3 messages from node[1] (last 2 are spam, window is 10 secs)
for msg in messages1: await nodes[0].publish(some(pubsubTopics[0]), msg)
for msg in messages2: await nodes[1].publish(some(pubsubTopics[1]), msg)
for msg in messages1: discard await nodes[0].publish(some(pubsubTopics[0]), msg)
for msg in messages2: discard await nodes[1].publish(some(pubsubTopics[1]), msg)

# wait for gossip to propagate
await sleepAsync(5000.millis)
Expand Down Expand Up @@ -266,7 +266,7 @@ procSuite "WakuNode - RLN relay":
## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3
## never gets called
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
await node1.publish(some(DefaultPubsubTopic), message)
discard await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis)

check:
Expand Down Expand Up @@ -378,10 +378,10 @@ procSuite "WakuNode - RLN relay":
## node2 should detect either of wm1 or wm2 as spam and not relay it
## node2 should relay wm3 to node3
## node2 should not relay wm4 because it has no valid rln proof
await node1.publish(some(DefaultPubsubTopic), wm1)
await node1.publish(some(DefaultPubsubTopic), wm2)
await node1.publish(some(DefaultPubsubTopic), wm3)
await node1.publish(some(DefaultPubsubTopic), wm4)
discard await node1.publish(some(DefaultPubsubTopic), wm1)
discard await node1.publish(some(DefaultPubsubTopic), wm2)
discard await node1.publish(some(DefaultPubsubTopic), wm3)
discard await node1.publish(some(DefaultPubsubTopic), wm4)
await sleepAsync(2000.millis)

let
Expand Down Expand Up @@ -474,9 +474,9 @@ procSuite "WakuNode - RLN relay":
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)

await node1.publish(some(DefaultPubsubTopic), wm1)
await node1.publish(some(DefaultPubsubTopic), wm2)
await node1.publish(some(DefaultPubsubTopic), wm3)
discard await node1.publish(some(DefaultPubsubTopic), wm1)
discard await node1.publish(some(DefaultPubsubTopic), wm2)
discard await node1.publish(some(DefaultPubsubTopic), wm3)

let
res1 = await completionFut1.withTimeout(10.seconds)
Expand Down
16 changes: 8 additions & 8 deletions tests/wakunode2/test_validators.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ suite "WakuNode2 - Validators":
# Include signature
msg.meta = secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]

await nodes[i].publish(some(spamProtectedTopic), msg)
discard await nodes[i].publish(some(spamProtectedTopic), msg)

# Wait for gossip
await sleepAsync(2.seconds)
Expand Down Expand Up @@ -146,23 +146,23 @@ suite "WakuNode2 - Validators":
# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]

await nodes[i].publish(some(spamProtectedTopic), msg)
discard await nodes[i].publish(some(spamProtectedTopic), msg)

# Each node sends 5 messages that are not signed (total = 25)
for i in 0..<5:
for j in 0..<5:
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)

# Each node sends 5 messages that dont contain timestamp (total = 25)
for i in 0..<5:
for j in 0..<5:
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: 0, ephemeral: true)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)

# Each node sends 5 messages way BEFORE than the current timestmap (total = 25)
for i in 0..<5:
Expand All @@ -171,7 +171,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: beforeTimestamp, ephemeral: true)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)

# Each node sends 5 messages way LATER than the current timestmap (total = 25)
for i in 0..<5:
Expand All @@ -180,7 +180,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: afterTimestamp, ephemeral: true)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)

# Wait for gossip
await sleepAsync(4.seconds)
Expand Down Expand Up @@ -255,7 +255,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[0].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[0].publish(some(spamProtectedTopic), unsignedMessage)

# nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1])
for j in 0..<50:
Expand All @@ -264,7 +264,7 @@ suite "WakuNode2 - Validators":
version: 2, timestamp: now(), ephemeral: true)
# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[0].publish(some(spamProtectedTopic), msg)
discard await nodes[0].publish(some(spamProtectedTopic), msg)

# Wait for gossip
await sleepAsync(2.seconds)
Expand Down
4 changes: 2 additions & 2 deletions tests/wakunode_jsonrpc/test_jsonrpc_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ suite "Waku v2 JSON-RPC API - Relay":

## When
for msg in messages:
await srcNode.publish(some(pubSubTopic), msg)
discard await srcNode.publish(some(pubSubTopic), msg)

await sleepAsync(200.millis)

Expand Down Expand Up @@ -261,7 +261,7 @@ suite "Waku v2 JSON-RPC API - Relay":

## When
for msg in messages:
await srcNode.publish(none(PubsubTopic), msg)
discard await srcNode.publish(none(PubsubTopic), msg)

await sleepAsync(200.millis)

Expand Down
Loading

0 comments on commit f841454

Please sign in to comment.