Skip to content

Commit

Permalink
fix: add log and archive message ingress for sync (#3133)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Oct 23, 2024
1 parent 8106abb commit 80c7581
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
37 changes: 35 additions & 2 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ proc handleMessage*(
) {.async.} =
self.validator(msg).isOkOr:
waku_archive_errors.inc(labelValues = [error])
trace "invalid message",
msg_hash = computeMessageHash(pubsubTopic, msg).to0xHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp,
error = error
return

let msgHash = computeMessageHash(pubsubTopic, msg)
Expand All @@ -95,15 +101,42 @@ proc handleMessage*(
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
trace "failed to insert message",
hash_hash = msgHash.to0xHex(),
msg_hash = msgHash.to0xHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp,
error = error
return

trace "message archived",
msg_hash = msgHash.to0xHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp

let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)

proc syncMessageIngress*(
self: WakuArchive,
msgHash: WakuMessageHash,
pubsubTopic: PubsubTopic,
msg: WakuMessage,
) {.async.} =
let insertStartTime = getTime().toUnixFloat()

(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
trace "failed to insert message",
msg_hash = msgHash.to0xHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp,
error = error
return

trace "message archived",
hash_hash = msgHash.to0xHex(),
msg_hash = msgHash.to0xHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp
Expand Down
4 changes: 3 additions & 1 deletion waku/waku_sync/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ proc createTransferCallback(

for kv in response.messages:
let handleRes = catch:
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
await wakuArchive.syncMessageIngress(
kv.messageHash, kv.pubsubTopic.get(), kv.message.get()
)

if handleRes.isErr():
error "message transfer failed", error = handleRes.error.msg
Expand Down

0 comments on commit 80c7581

Please sign in to comment.