Skip to content

Commit

Permalink
Merge 4a8c5b8 into 0cc0c80
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored Dec 13, 2023
2 parents 0cc0c80 + 4a8c5b8 commit c2f34ca
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
8 changes: 0 additions & 8 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -751,14 +751,6 @@ proc mountArchive*(node: WakuNode,
return err("error in mountArchive: " & wakuArchiveRes.error)

node.wakuArchive = wakuArchiveRes.get()

try:
let reportMetricRes = waitFor node.wakuArchive.reportStoredMessagesMetric()
if reportMetricRes.isErr():
return err("error in mountArchive: " & reportMetricRes.error)
except CatchableError:
return err("exception in mountArchive: " & getCurrentExceptionMsg())

asyncSpawn node.wakuArchive.start()
return ok()

Expand Down
23 changes: 16 additions & 7 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type
validator: MessageValidator
retentionPolicy: RetentionPolicy
retPolicyFut: Future[Result[void, string]] ## retention policy cancelable future
retMetricsRepFut: Future[Result[void, string]] ## metrics reporting cancelable future

proc new*(T: type WakuArchive,
driver: ArchiveDriver,
Expand Down Expand Up @@ -212,21 +213,29 @@ proc loopApplyRetentionPolicy*(w: WakuArchive):

return ok()

proc reportStoredMessagesMetric*(w: WakuArchive):
Future[Result[void, string]] {.async.} =
# Metrics reporting
const WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(1)

proc loopReportStoredMessagesMetric*(w: WakuArchive):
Future[Result[void, string]] {.async.} =
if w.driver.isNil():
return err("driver is Nil in reportStoredMessagesMetric")
return err("driver is Nil in loopReportStoredMessagesMetric")

let resCount = await w.driver.getMessagesCount()
if resCount.isErr():
return err("failed to get messages count: " & resCount.error)
while true:
let resCount = await w.driver.getMessagesCount()
if resCount.isErr():
return err("loopReportStoredMessagesMetric failed to get messages count: " & resCount.error)

waku_archive_messages.set(resCount.value, labelValues = ["stored"])
waku_archive_messages.set(resCount.value, labelValues = ["stored"])
await sleepAsync(WakuArchiveDefaultMetricsReportInterval)

return ok()

proc start*(self: WakuArchive) {.async.} =
## TODO: better control the Result in case of error. Now it is ignored
self.retPolicyFut = self.loopApplyRetentionPolicy()
self.retMetricsRepFut = self.loopReportStoredMessagesMetric()

proc stop*(self: WakuArchive) {.async.} =
self.retPolicyFut.cancel()
self.retMetricsRepFut.cancel()

0 comments on commit c2f34ca

Please sign in to comment.