From f7ab24e03d1809a5648a72868b287a8b8d76c44e Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 18 Sep 2023 16:55:08 +0200 Subject: [PATCH] waku_thread.nim: Using 'ThreadSignalPtr' instead of loop to handle req/resp * Updating 'nim-chronos' so that the 'chronos/threadsync' module is available. * Also adding changes in waku_metrics.nim, archive.nim, and waku_filter_v2/protocol.nim so that they compile after the 'nim-chronos' update. --- library/waku_thread/waku_thread.nim | 33 ++++++++++++++++++++++------- waku/node/waku_metrics.nim | 2 +- waku/waku_archive/archive.nim | 2 +- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index c9a4051177..a2965e4dd2 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -8,9 +8,10 @@ import import chronicles, chronos, + chronos/threadsync, + taskpools/channels_spsc_single, stew/results, - stew/shims/net, - taskpools/channels_spsc_single + stew/shims/net import ../../../waku/node/waku_node, ../events/[json_error_event,json_message_event,json_base_event], @@ -21,7 +22,9 @@ type Context* = object thread: Thread[(ptr Context)] reqChannel: ChannelSPSCSingle[ptr InterThreadRequest] + reqSignal: ThreadSignalPtr respChannel: ChannelSPSCSingle[ptr InterThreadResponse] + respSignal: ThreadSignalPtr var ctx {.threadvar.}: ptr Context @@ -52,6 +55,7 @@ proc run(ctx: ptr Context) {.thread.} = ## Trying to get a request from the libwaku main thread var request: ptr InterThreadRequest + waitFor ctx.reqSignal.wait() let recvOk = ctx.reqChannel.tryRecv(request) if recvOk == true: let resultResponse = @@ -61,9 +65,8 @@ proc run(ctx: ptr Context) {.thread.} = let threadSafeResp = InterThreadResponse.createShared(resultResponse) ## The error-handling is performed in the main thread - discard ctx.respChannel.trySend( threadSafeResp ) - - waitFor sleepAsync(1) + discard ctx.respChannel.trySend(threadSafeResp) + discard ctx.respSignal.fireSync() tearDownForeignThreadGc() @@ -74,6 +77,10 @@ proc createWakuThread*(): Result[void, string] = waku_init() ctx = createShared(Context, 1) + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqSignal ThreadSignalPtr") + ctx.respSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create respSignal ThreadSignalPtr") running.store(true) @@ -90,6 +97,8 @@ proc createWakuThread*(): Result[void, string] = proc stopWakuNodeThread*() = running.store(false) joinThread(ctx.thread) + discard ctx.reqSignal.close() + discard ctx.respSignal.close() freeShared(ctx) proc sendRequestToWakuThread*(reqType: RequestType, @@ -102,12 +111,20 @@ proc sendRequestToWakuThread*(reqType: RequestType, if not sentOk: return err("Couldn't send a request to the waku thread: " & $req[]) + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + return err("Couldn't fireSync in time") + ## Waiting for the response + waitFor ctx.respSignal.wait() + var response: ptr InterThreadResponse var recvOk = ctx.respChannel.tryRecv(response) - while recvOk == false: - recvOk = ctx.respChannel.tryRecv(response) - os.sleep(1) + if recvOk == false: + return err("Couldn't receive response from the waku thread: " & $req[]) ## Converting the thread-safe response into a managed/CG'ed `Result` return InterThreadResponse.process(response) diff --git a/waku/node/waku_metrics.nim b/waku/node/waku_metrics.nim index 140b55e7a2..357694e728 100644 --- a/waku/node/waku_metrics.nim +++ b/waku/node/waku_metrics.nim @@ -61,4 +61,4 @@ proc startMetricsLog*() = discard setTimer(Moment.fromNow(LogInterval), logMetrics) ) - discard setTimer(Moment.fromNow(LogInterval), logMetrics) + discard setTimer(Moment.fromNow(LogInterval), logMetrics) \ No newline at end of file diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index b6e8aad3fa..5dcaf3f9ee 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -238,4 +238,4 @@ proc startMessageRetentionPolicyPeriodicTask*(w: WakuArchive, discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) ) - discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) + discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) \ No newline at end of file