Skip to content

Commit

Permalink
archive: simplify and enhance async retention policy application (#2278)
Browse files Browse the repository at this point in the history
* Avoid using timer and just use an infinite async loop that can be
cancelled at any time.
  • Loading branch information
Ivansete-status authored Dec 11, 2023
1 parent 45b0be8 commit 77c5ba7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 41 deletions.
17 changes: 4 additions & 13 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,6 @@ proc filterUnsubscribeAll*(node: WakuNode,
# yet incompatible to handle both type of filters - use specific filter registration instead

## Waku archive
const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes
proc mountArchive*(node: WakuNode,
driver: ArchiveDriver,
retentionPolicy = none(RetentionPolicy)):
Expand All @@ -760,18 +759,7 @@ proc mountArchive*(node: WakuNode,
except CatchableError:
return err("exception in mountArchive: " & getCurrentExceptionMsg())

if retentionPolicy.isSome():
try:
debug "executing message retention policy"
let retPolRes = waitFor node.wakuArchive.executeMessageRetentionPolicy()
if retPolRes.isErr():
return err("error in mountArchive: " & retPolRes.error)
except CatchableError:
return err("exception in mountArch-ret-pol: " & getCurrentExceptionMsg())

node.wakuArchive.startMessageRetentionPolicyPeriodicTask(
WakuArchiveDefaultRetentionPolicyInterval)

asyncSpawn node.wakuArchive.start()
return ok()

## Waku store
Expand Down Expand Up @@ -1174,6 +1162,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuRlnRelay.isNil():
await node.wakuRlnRelay.stop()

if not node.wakuArchive.isNil():
await node.wakuArchive.stop()

node.started = false

proc isReady*(node: WakuNode): Future[bool] {.async.} =
Expand Down
45 changes: 17 additions & 28 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type
driver*: ArchiveDriver # TODO: Make this field private. Remove asterisk
validator: MessageValidator
retentionPolicy: RetentionPolicy
retPolicyFut: Future[Result[void, string]] ## retention policy cancelable future

proc new*(T: type WakuArchive,
driver: ArchiveDriver,
Expand Down Expand Up @@ -189,19 +190,25 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
return ok(ArchiveResponse(messages: messages, cursor: cursor))

# Retention policy
const WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30)

proc loopApplyRetentionPolicy*(w: WakuArchive):
Future[Result[void, string]] {.async.} =

proc executeMessageRetentionPolicy*(w: WakuArchive):
Future[Result[void, string]] {.async.} =
if w.retentionPolicy.isNil():
return err("retentionPolicy is Nil in executeMessageRetentionPolicy")

if w.driver.isNil():
return err("driver is Nil in executeMessageRetentionPolicy")

let retPolicyRes = await w.retentionPolicy.execute(w.driver)
if retPolicyRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
return err("failed execution of retention policy: " & retPolicyRes.error)
while true:
debug "executing message retention policy"
let retPolicyRes = await w.retentionPolicy.execute(w.driver)
if retPolicyRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error=retPolicyRes.error

await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval)

return ok()

Expand All @@ -218,26 +225,8 @@ proc reportStoredMessagesMetric*(w: WakuArchive):

return ok()

proc startMessageRetentionPolicyPeriodicTask*(w: WakuArchive,
interval: timer.Duration) =
# Start the periodic message retention policy task
# https://github.com/nim-lang/Nim/issues/17369

var executeRetentionPolicy: CallbackFunc
executeRetentionPolicy =
CallbackFunc(
proc (arg: pointer) {.gcsafe, raises: [].} =
try:
let retPolRes = waitFor w.executeMessageRetentionPolicy()
if retPolRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "error in periodic retention policy", error = retPolRes.error
except CatchableError:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "exception in periodic retention policy",
error = getCurrentExceptionMsg()

discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
)
proc start*(self: WakuArchive) {.async.} =
self.retPolicyFut = self.loopApplyRetentionPolicy()

discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
proc stop*(self: WakuArchive) {.async.} =
self.retPolicyFut.cancel()

0 comments on commit 77c5ba7

Please sign in to comment.