Skip to content

Commit

Permalink
Merge 7b0ed28 into 72f9066
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored Sep 18, 2023
2 parents 72f9066 + 7b0ed28 commit 48165c1
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 37 deletions.
33 changes: 25 additions & 8 deletions library/waku_thread/waku_thread.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import
import
chronicles,
chronos,
chronos/threadsync,
taskpools/channels_spsc_single,
stew/results,
stew/shims/net,
taskpools/channels_spsc_single
stew/shims/net
import
../../../waku/node/waku_node,
../events/[json_error_event,json_message_event,json_base_event],
Expand All @@ -21,7 +22,9 @@ type
Context* = object
thread: Thread[(ptr Context)]
reqChannel: ChannelSPSCSingle[ptr InterThreadRequest]
reqSignal: ThreadSignalPtr
respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
respSignal: ThreadSignalPtr

var ctx {.threadvar.}: ptr Context

Expand Down Expand Up @@ -52,6 +55,7 @@ proc run(ctx: ptr Context) {.thread.} =
## Trying to get a request from the libwaku main thread

var request: ptr InterThreadRequest
waitFor ctx.reqSignal.wait()
let recvOk = ctx.reqChannel.tryRecv(request)
if recvOk == true:
let resultResponse =
Expand All @@ -61,9 +65,8 @@ proc run(ctx: ptr Context) {.thread.} =
let threadSafeResp = InterThreadResponse.createShared(resultResponse)

## The error-handling is performed in the main thread
discard ctx.respChannel.trySend( threadSafeResp )

waitFor sleepAsync(1)
discard ctx.respChannel.trySend(threadSafeResp)
discard ctx.respSignal.fireSync()

tearDownForeignThreadGc()

Expand All @@ -74,6 +77,10 @@ proc createWakuThread*(): Result[void, string] =
waku_init()

ctx = createShared(Context, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.respSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create respSignal ThreadSignalPtr")

running.store(true)

Expand All @@ -90,6 +97,8 @@ proc createWakuThread*(): Result[void, string] =
proc stopWakuNodeThread*() =
running.store(false)
joinThread(ctx.thread)
discard ctx.reqSignal.close()
discard ctx.respSignal.close()
freeShared(ctx)

proc sendRequestToWakuThread*(reqType: RequestType,
Expand All @@ -102,12 +111,20 @@ proc sendRequestToWakuThread*(reqType: RequestType,
if not sentOk:
return err("Couldn't send a request to the waku thread: " & $req[])

let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
return err("failed fireSync: " & $fireSyncRes.error)

if fireSyncRes.get() == false:
return err("Couldn't fireSync in time")

## Waiting for the response
waitFor ctx.respSignal.wait()

var response: ptr InterThreadResponse
var recvOk = ctx.respChannel.tryRecv(response)
while recvOk == false:
recvOk = ctx.respChannel.tryRecv(response)
os.sleep(1)
if recvOk == false:
return err("Couldn't receive response from the waku thread: " & $req[])

## Converting the thread-safe response into a managed/CG'ed `Result`
return InterThreadResponse.process(response)
17 changes: 7 additions & 10 deletions waku/node/waku_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@ const LogInterval = 30.seconds
logScope:
topics = "waku node metrics"


type
# https://github.com/nim-lang/Nim/issues/17369
MetricsLogger = proc(udata: pointer) {.gcsafe, raises: [Defect].}

proc startMetricsLog*() =
var logMetrics: MetricsLogger
var logMetrics: CallbackFunc

var cumulativeErrors = 0.float64
var cumulativeConns = 0.float64

let logRlnMetrics = getRlnMetricsLogger()

logMetrics = proc(udata: pointer) =
{.gcsafe.}:
logMetrics = CallbackFunc(
proc(udata: pointer) {.gcsafe.} =

# TODO: libp2p_pubsub_peers is not public, so we need to make this either
# public in libp2p or do our own peer counting after all.

Expand All @@ -62,6 +58,7 @@ proc startMetricsLog*() =
# Start protocol specific metrics logging
logRlnMetrics()

discard setTimer(Moment.fromNow(LogInterval), logMetrics)
discard setTimer(Moment.fromNow(LogInterval), logMetrics)
)

discard setTimer(Moment.fromNow(LogInterval), logMetrics)
discard setTimer(Moment.fromNow(LogInterval), logMetrics)
32 changes: 17 additions & 15 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,21 @@ proc startMessageRetentionPolicyPeriodicTask*(w: WakuArchive,
# Start the periodic message retention policy task
# https://github.com/nim-lang/Nim/issues/17369

var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].}
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =

try:
let retPolRes = waitFor w.executeMessageRetentionPolicy()
if retPolRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "error in periodic retention policy", error = retPolRes.error
except CatchableError:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "exception in periodic retention policy",
error = getCurrentExceptionMsg()

discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
var executeRetentionPolicy: CallbackFunc
executeRetentionPolicy =
CallbackFunc(
proc (arg: pointer) {.gcsafe, raises: [].} =
try:
let retPolRes = waitFor w.executeMessageRetentionPolicy()
if retPolRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "error in periodic retention policy", error = retPolRes.error
except CatchableError:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "exception in periodic retention policy",
error = getCurrentExceptionMsg()

discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
)

discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
10 changes: 6 additions & 4 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,12 @@ const MaintainSubscriptionsInterval* = 1.minutes

proc startMaintainingSubscriptions*(wf: WakuFilter, interval: Duration) =
trace "starting to maintain subscriptions"
var maintainSubs: proc(udata: pointer) {.gcsafe, raises: [Defect].}
maintainSubs = proc(udata: pointer) {.gcsafe.} =
maintainSubscriptions(wf)
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
var maintainSubs: CallbackFunc
maintainSubs = CallbackFunc(
proc(udata: pointer) {.gcsafe.} =
maintainSubscriptions(wf)
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
)

wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)

Expand Down

0 comments on commit 48165c1

Please sign in to comment.