diff --git a/library/events/json_topic_health_change_event.nim b/library/events/json_topic_health_change_event.nim new file mode 100644 index 0000000000..c735eccbf1 --- /dev/null +++ b/library/events/json_topic_health_change_event.nim @@ -0,0 +1,23 @@ +import system, results, std/json +import stew/byteutils +import ../../waku/common/base64, ./json_base_event +import ../../waku/waku_relay + +type JsonTopicHealthChangeEvent* = ref object of JsonEvent + pubsubTopic*: string + topicHealth*: TopicHealth + +proc new*( + T: type JsonTopicHealthChangeEvent, pubsubTopic: string, topicHealth: TopicHealth +): T = + # Returns a TopicHealthChange event as indicated in + # https://rfc.vac.dev/spec/36/#jsonmessageevent-type + + return JsonTopicHealthChangeEvent( + eventType: "relay_topic_health_change", + pubsubTopic: pubsubTopic, + topicHealth: topicHealth, + ) + +method `$`*(jsonTopicHealthChange: JsonTopicHealthChangeEvent): string = + $(%*jsonTopicHealthChange) diff --git a/library/libwaku.nim b/library/libwaku.nim index 13022f879a..d617c00bb8 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -13,8 +13,8 @@ import waku/node/waku_node, waku/waku_core/topics/pubsub_topic, waku/waku_core/subscription/push_handler, - waku/waku_relay/protocol, - ./events/json_message_event, + waku/waku_relay, + ./events/[json_message_event, json_topic_health_change_event], ./waku_thread/waku_thread, ./waku_thread/inter_thread_communication/requests/node_lifecycle_request, ./waku_thread/inter_thread_communication/requests/peer_manager_request, @@ -84,6 +84,31 @@ proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler = RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData ) +proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler = + return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} = + # Callback that hadles the Waku Relay events. i.e. messages or errors. + if isNil(ctx[].eventCallback): + error "onTopicHealthChange - eventCallback is nil" + return + + if isNil(ctx[].eventUserData): + error "onTopicHealthChange - eventUserData is nil" + return + + foreignThreadGc: + try: + let event = $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth) + cast[WakuCallBack](ctx[].eventCallback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData + ) + except Exception, CatchableError: + let msg = + "Exception onTopicHealthChange when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[WakuCallBack](ctx[].eventCallback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData + ) + ### End of not-exported components ################################################################################ @@ -139,7 +164,10 @@ proc waku_new( ctx.userData = userData - let appCallbacks = AppCallbacks(relayHandler: onReceivedMessage(ctx)) + let appCallbacks = AppCallbacks( + relayHandler: onReceivedMessage(ctx), + topicHealthChangeHandler: onTopicHealthChange(ctx), + ) let retCode = handleRequest( ctx, diff --git a/waku/factory/app_callbacks.nim b/waku/factory/app_callbacks.nim index ffab59c240..1bcd6cc0ef 100644 --- a/waku/factory/app_callbacks.nim +++ b/waku/factory/app_callbacks.nim @@ -1,4 +1,5 @@ -import ../waku_relay/protocol +import ../waku_relay type AppCallbacks* = ref object relayHandler*: WakuRelayHandler + topicHealthChangeHandler*: TopicHealthChangeHandler diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 4fed2f1dca..4ad094dff4 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -169,7 +169,13 @@ proc setupAppCallbacks( for shard in shards: discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler) - return ok() + if not appCallbacks.topicHealthChangeHandler.isNil(): + if node.wakuRelay.isNil(): + return + err("Cannot configure topicHealthChangeHandler callback without Relay mounted") + node.wakuRelay.onTopicHealthChange = appCallbacks.topicHealthChangeHandler + + return ok() proc new*( T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil diff --git a/waku/waku_relay.nim b/waku/waku_relay.nim index ac35a90451..96328d984b 100644 --- a/waku/waku_relay.nim +++ b/waku/waku_relay.nim @@ -1,3 +1,3 @@ -import ./waku_relay/protocol +import ./waku_relay/[protocol, topic_health] -export protocol +export protocol, topic_health diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 966f6a4849..1d79e73366 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -18,7 +18,8 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, libp2p/switch -import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer +import + ../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer from ../waku_core/codecs import WakuRelayCodec export WakuRelayCodec @@ -131,6 +132,9 @@ type # a map of validators to error messages to return when validation fails validatorInserted: Table[PubsubTopic, bool] publishObservers: seq[PublishObserver] + topicsHealth*: Table[string, TopicHealth] + onTopicHealthChange*: TopicHealthChangeHandler + topicHealthLoopHandle*: Future[void] proc initProtocolHandler(w: WakuRelay) = proc handler(conn: Connection, proto: string) {.async.} = @@ -289,6 +293,7 @@ proc new*( procCall GossipSub(w).initPubSub() w.initProtocolHandler() w.initRelayObservers() + w.topicsHealth = initTable[string, TopicHealth]() except InitializationError: return err("initialization error: " & getCurrentExceptionMsg()) @@ -309,13 +314,78 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = ## Observes when a message is sent/received from the GossipSub PoV procCall GossipSub(w).addObserver(observer) +proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] = + ## Returns the number of peers in a mesh defined by the passed pubsub topic. + ## The 'mesh' atribute is defined in the GossipSub ref object. + + if not w.mesh.hasKey(pubsubTopic): + debug "getNumPeersInMesh - there is no mesh peer for the given pubsub topic", + pubsubTopic = pubsubTopic + return ok(0) + + let peersRes = catch: + w.mesh[pubsubTopic] + + let peers: HashSet[PubSubPeer] = peersRes.valueOr: + return + err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg) + + return ok(peers.len) + +proc calculateTopicHealth(wakuRelay: WakuRelay, topic: string): TopicHealth = + let numPeersInMesh = wakuRelay.getNumPeersInMesh(topic).valueOr: + error "Could not calculate topic health", topic = topic, error = error + return TopicHealth.UNHEALTHY + + if numPeersInMesh < 1: + return TopicHealth.UNHEALTHY + elif numPeersInMesh < wakuRelay.parameters.dLow: + return TopicHealth.MINIMALLY_HEALTHY + return TopicHealth.SUFFICIENTLY_HEALTHY + +proc updateTopicsHealth(wakuRelay: WakuRelay) {.async.} = + var futs = newSeq[Future[void]]() + for topic in toSeq(wakuRelay.topics.keys): + ## loop over all the topics I'm subscribed to + let + oldHealth = wakuRelay.topicsHealth.getOrDefault(topic) + currentHealth = wakuRelay.calculateTopicHealth(topic) + + if oldHealth == currentHealth: + continue + + wakuRelay.topicsHealth[topic] = currentHealth + if not wakuRelay.onTopicHealthChange.isNil(): + let fut = wakuRelay.onTopicHealthChange(topic, currentHealth) + if not fut.completed(): # Fast path for successful sync handlers + futs.add(fut) + + if futs.len() > 0: + # slow path - we have to wait for the handlers to complete + try: + futs = await allFinished(futs) + except CancelledError: + # check for errors in futures + for fut in futs: + if fut.failed: + let err = fut.readError() + warn "Error in health change handler", description = err.msg + +proc topicsHealthLoop(wakuRelay: WakuRelay) {.async.} = + while true: + await wakuRelay.updateTopicsHealth() + await sleepAsync(10.seconds) + method start*(w: WakuRelay) {.async, base.} = debug "start" await procCall GossipSub(w).start() + w.topicHealthLoopHandle = w.topicsHealthLoop() method stop*(w: WakuRelay) {.async, base.} = debug "stop" await procCall GossipSub(w).stop() + if not w.topicHealthLoopHandle.isNil(): + await w.topicHealthLoopHandle.cancelAndWait() proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool = GossipSub(w).topics.hasKey(topic) @@ -455,25 +525,6 @@ proc publish*( return relayedPeerCount -proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] = - ## Returns the number of peers in a mesh defined by the passed pubsub topic. - ## The 'mesh' atribute is defined in the GossipSub ref object. - - if not w.mesh.hasKey(pubsubTopic): - return err( - "getNumPeersInMesh - there is no mesh peer for the given pubsub topic: " & - pubsubTopic - ) - - let peersRes = catch: - w.mesh[pubsubTopic] - - let peers: HashSet[PubSubPeer] = peersRes.valueOr: - return - err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg) - - return ok(peers.len) - proc getNumConnectedPeers*( w: WakuRelay, pubsubTopic: PubsubTopic ): Result[int, string] = diff --git a/waku/waku_relay/topic_health.nim b/waku/waku_relay/topic_health.nim new file mode 100644 index 0000000000..774abc584b --- /dev/null +++ b/waku/waku_relay/topic_health.nim @@ -0,0 +1,19 @@ +import chronos + +import ../waku_core + +type TopicHealth* = enum + UNHEALTHY + MINIMALLY_HEALTHY + SUFFICIENTLY_HEALTHY + +proc `$`*(t: TopicHealth): string = + result = + case t + of UNHEALTHY: "UnHealthy" + of MINIMALLY_HEALTHY: "MinimallyHealthy" + of SUFFICIENTLY_HEALTHY: "SufficientlyHealthy" + +type TopicHealthChangeHandler* = proc( + pubsubTopic: PubsubTopic, topicHealth: TopicHealth +): Future[void] {.gcsafe, raises: [Defect].}