From 77c5ba7669f0b170cc66f888226daab73f5bf2c9 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 11 Dec 2023 08:50:40 +0100 Subject: [PATCH] archive: simplify and enhance async retention policy application (#2278) * Avoid using timer and just use an infinite async loop that can be cancelled at any time. --- waku/node/waku_node.nim | 17 ++++--------- waku/waku_archive/archive.nim | 45 +++++++++++++---------------------- 2 files changed, 21 insertions(+), 41 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 41cf009e6e..f138b85818 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -740,7 +740,6 @@ proc filterUnsubscribeAll*(node: WakuNode, # yet incompatible to handle both type of filters - use specific filter registration instead ## Waku archive -const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes proc mountArchive*(node: WakuNode, driver: ArchiveDriver, retentionPolicy = none(RetentionPolicy)): @@ -760,18 +759,7 @@ proc mountArchive*(node: WakuNode, except CatchableError: return err("exception in mountArchive: " & getCurrentExceptionMsg()) - if retentionPolicy.isSome(): - try: - debug "executing message retention policy" - let retPolRes = waitFor node.wakuArchive.executeMessageRetentionPolicy() - if retPolRes.isErr(): - return err("error in mountArchive: " & retPolRes.error) - except CatchableError: - return err("exception in mountArch-ret-pol: " & getCurrentExceptionMsg()) - - node.wakuArchive.startMessageRetentionPolicyPeriodicTask( - WakuArchiveDefaultRetentionPolicyInterval) - + asyncSpawn node.wakuArchive.start() return ok() ## Waku store @@ -1174,6 +1162,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuRlnRelay.isNil(): await node.wakuRlnRelay.stop() + if not node.wakuArchive.isNil(): + await node.wakuArchive.stop() + node.started = false proc isReady*(node: WakuNode): Future[bool] {.async.} = diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index fef44efb02..ed0c21c430 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -68,6 +68,7 @@ type driver*: ArchiveDriver # TODO: Make this field private. Remove asterisk validator: MessageValidator retentionPolicy: RetentionPolicy + retPolicyFut: Future[Result[void, string]] ## retention policy cancelable future proc new*(T: type WakuArchive, driver: ArchiveDriver, @@ -189,19 +190,25 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] { return ok(ArchiveResponse(messages: messages, cursor: cursor)) # Retention policy +const WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30) + +proc loopApplyRetentionPolicy*(w: WakuArchive): + Future[Result[void, string]] {.async.} = -proc executeMessageRetentionPolicy*(w: WakuArchive): - Future[Result[void, string]] {.async.} = if w.retentionPolicy.isNil(): return err("retentionPolicy is Nil in executeMessageRetentionPolicy") if w.driver.isNil(): return err("driver is Nil in executeMessageRetentionPolicy") - let retPolicyRes = await w.retentionPolicy.execute(w.driver) - if retPolicyRes.isErr(): - waku_archive_errors.inc(labelValues = [retPolicyFailure]) - return err("failed execution of retention policy: " & retPolicyRes.error) + while true: + debug "executing message retention policy" + let retPolicyRes = await w.retentionPolicy.execute(w.driver) + if retPolicyRes.isErr(): + waku_archive_errors.inc(labelValues = [retPolicyFailure]) + error "failed execution of retention policy", error=retPolicyRes.error + + await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval) return ok() @@ -218,26 +225,8 @@ proc reportStoredMessagesMetric*(w: WakuArchive): return ok() -proc startMessageRetentionPolicyPeriodicTask*(w: WakuArchive, - interval: timer.Duration) = - # Start the periodic message retention policy task - # https://github.com/nim-lang/Nim/issues/17369 - - var executeRetentionPolicy: CallbackFunc - executeRetentionPolicy = - CallbackFunc( - proc (arg: pointer) {.gcsafe, raises: [].} = - try: - let retPolRes = waitFor w.executeMessageRetentionPolicy() - if retPolRes.isErr(): - waku_archive_errors.inc(labelValues = [retPolicyFailure]) - error "error in periodic retention policy", error = retPolRes.error - except CatchableError: - waku_archive_errors.inc(labelValues = [retPolicyFailure]) - error "exception in periodic retention policy", - error = getCurrentExceptionMsg() - - discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) - ) +proc start*(self: WakuArchive) {.async.} = + self.retPolicyFut = self.loopApplyRetentionPolicy() - discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) +proc stop*(self: WakuArchive) {.async.} = + self.retPolicyFut.cancel()