-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: topic health tracking #3212
Conversation
You can find the image built from this PR at
Built from fe2a18c |
topicsHealth*: Table[string, TopicHealth] | ||
onTopicHealthChange*: TopicHealthChangeHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added these two parameters + wakuRelay
for feature-parity with go-waku's peer manager
relay -> wakuRelay
subRelayTopics -> topicsHealth
TopicHealthNotifCh -> onTopicHealthChange
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think onTopicHealthChange
should also belong only to WakuRelay
if futs.len() > 0: | ||
proc waiter(): Future[void] {.async.} = | ||
# slow path - we have to wait for the handlers to complete | ||
try: | ||
futs = await allFinished(futs) | ||
except CancelledError: | ||
# propagate cancellation | ||
for fut in futs: | ||
if not (fut.finished): | ||
await fut.cancelAndWait() | ||
|
||
# check for errors in futures | ||
for fut in futs: | ||
if fut.failed: | ||
let err = fut.readError() | ||
warn "Error in health change handler", description = err.msg | ||
|
||
return waiter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part looks horrible honestly - copied it from nim-libp2p's handling of Relay callback events
It's in part necessary because the callbacks configured can be async
@@ -914,6 +967,7 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = | |||
await pm.manageRelayPeers() | |||
else: | |||
await pm.connectToRelayPeers() | |||
discard pm.updateTopicsHealth() # we don't want to await for the callbacks to finish |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discarding because the completion of the app callbacks shouldn't stop the peer manager from establishing new connections
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, I think we can have a separate async
loop but running within the topic_healt.nim
module
proc `$`*(t: TopicHealth): string = | ||
result = | ||
case t | ||
of UNHEALTHY: "UnHealthy" | ||
of MINIMALLY_HEALTHY: "MinimallyHealthy" | ||
of SUFFICIENTLY_HEALTHY: "SufficientlyHealthy" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work, sir!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
I've added some comments that I hope you find useful. IMO, we can have a simpler solution without even touching PeerManager
:)
Great work so far 🙌
topicsHealth*: Table[string, TopicHealth] | ||
onTopicHealthChange*: TopicHealthChangeHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think onTopicHealthChange
should also belong only to WakuRelay
let fut = pm.onTopicHealthChange(topic, currentHealth) | ||
if not fut.completed(): # Fast path for successful sync handlers | ||
futs.add(fut) | ||
|
||
if futs.len() > 0: | ||
proc waiter(): Future[void] {.async.} = | ||
# slow path - we have to wait for the handlers to complete | ||
try: | ||
futs = await allFinished(futs) | ||
except CancelledError: | ||
# propagate cancellation | ||
for fut in futs: | ||
if not (fut.finished): | ||
await fut.cancelAndWait() | ||
|
||
# check for errors in futures | ||
for fut in futs: | ||
if fut.failed: | ||
let err = fut.readError() | ||
warn "Error in health change handler", description = err.msg | ||
|
||
return waiter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't just invoke the callbacks in this point? Sorry but I can't quite see the need of this part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I updated it, took it from the callback handling of nim-libp2p
but I agree we can just await for the callbacks to be finished and not define this other waiter proc
@@ -914,6 +967,7 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = | |||
await pm.manageRelayPeers() | |||
else: | |||
await pm.connectToRelayPeers() | |||
discard pm.updateTopicsHealth() # we don't want to await for the callbacks to finish |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, I think we can have a separate async
loop but running within the topic_healt.nim
module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for it! 🙌
c9977e4
to
55e4019
Compare
Description
Tracking topic health in the Peer Manager in the same way as in
go-waku
, and integrating the feature intolibwaku
Changes
topicHealthChangeHandler
app callback and setting it inlibwaku
topic_health.nim
file for topic health definitionsIssue
closes #3196