Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allowing configuration of application level callbacks #3206

Merged
merged 12 commits into from
Dec 13, 2024
9 changes: 7 additions & 2 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import
./waku_thread/inter_thread_communication/requests/ping_request,
./waku_thread/inter_thread_communication/waku_thread_request,
./alloc,
./ffi_types
./ffi_types,
../waku/factory/app_callbacks

################################################################################
### Wrapper around the waku node
Expand Down Expand Up @@ -138,10 +139,14 @@ proc waku_new(

ctx.userData = userData

let appCallbacks = AppCallbacks(relayHandler: onReceivedMessage(ctx))

let retCode = handleRequest(
ctx,
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson),
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.CREATE_NODE, configJson, appCallbacks
),
callback,
userData,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import
../../../../waku/factory/waku,
../../../../waku/factory/node_factory,
../../../../waku/factory/networks_config,
../../../../waku/factory/app_callbacks,
../../../alloc

type NodeLifecycleMsgType* = enum
Expand All @@ -17,20 +18,27 @@ type NodeLifecycleMsgType* = enum
type NodeLifecycleRequest* = object
operation: NodeLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation
appCallbacks: AppCallbacks

proc createShared*(
T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = ""
T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType,
configJson: cstring = "",
appCallbacks: AppCallbacks = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].appCallbacks = appCallbacks
ret[].configJson = configJson.alloc()
return ret

proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)

proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
proc createWaku(
configJson: cstring, appCallbacks: AppCallbacks = nil
): Future[Result[Waku, string]] {.async.} =
var conf = defaultWakuNodeConf().valueOr:
return err("Failed creating node: " & error)

Expand Down Expand Up @@ -59,7 +67,7 @@ proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
formattedString & ". expected type: " & $typeof(confValue)
)

let wakuRes = Waku.new(conf).valueOr:
let wakuRes = Waku.new(conf, appCallbacks).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)

Expand All @@ -73,7 +81,7 @@ proc process*(

case self.operation
of CREATE_NODE:
waku[] = (await createWaku(self.configJson)).valueOr:
waku[] = (await createWaku(self.configJson, self.appCallbacks)).valueOr:
error "CREATE_NODE failed", error = error
return err("error processing createWaku request: " & $error)
of START_NODE:
Expand Down
4 changes: 4 additions & 0 deletions waku/factory/app_callbacks.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import ../waku_relay/protocol

type AppCallbacks* = ref object
relayHandler*: WakuRelayHandler
17 changes: 12 additions & 5 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
return uint32(MaxShardIndex + 1)

proc getAutoshards*(
node: WakuNode, contentTopics: seq[string]
): Result[seq[RelayShard], string] =
var autoShards: seq[RelayShard]
for contentTopic in contentTopics:
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
return err("Could not parse content topic: " & error)
autoShards.add(shard)
return ok(autoshards)

proc setupProtocols(
node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey
): Future[Result[void, string]] {.async.} =
Expand Down Expand Up @@ -169,11 +179,8 @@ proc setupProtocols(

peerExchangeHandler = some(handlePeerExchange)

var autoShards: seq[RelayShard]
for contentTopic in conf.contentTopics:
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
return err("Could not parse content topic: " & error)
autoShards.add(shard)
let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
return err("Could not get autoshards: " & error)

debug "Shards created from content topics",
contentTopics = conf.contentTopics, shards = autoShards
Expand Down
34 changes: 33 additions & 1 deletion waku/factory/waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import
../factory/node_factory,
../factory/internal_config,
../factory/external_config,
../factory/app_callbacks,
../waku_enr/multiaddr

logScope:
Expand All @@ -67,6 +68,7 @@ type Waku* = ref object

restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef
appCallbacks*: AppCallbacks

proc logConfig(conf: WakuNodeConf) =
info "Configuration: Enabled protocols",
Expand Down Expand Up @@ -146,7 +148,32 @@ proc newCircuitRelay(isRelayClient: bool): Relay =
return RelayClient.new()
return Relay.new()

proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
proc setupAppCallbacks(
node: WakuNode, conf: WakuNodeConf, appCallbacks: AppCallbacks
): Result[void, string] =
if appCallbacks.isNil():
info "No external callbacks to be set"
return ok()

if not appCallbacks.relayHandler.isNil():
if node.wakuRelay.isNil():
return err("Cannot configure relayHandler callback without Relay mounted")

let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
return err("Could not get autoshards: " & error)

let confShards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
let shards = confShards & autoShards

for shard in shards:
discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler)

return ok()

proc new*(
T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil
): Result[Waku, string] =
let rng = crypto.newRng()

logging.setupLog(confCopy.logLevel, confCopy.logFormat)
Expand Down Expand Up @@ -225,6 +252,10 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =

let node = nodeRes.get()

node.setupAppCallbacks(confCopy, appCallbacks).isOkOr:
error "Failed setting up app callbacks", error = error
return err("Failed setting up app callbacks: " & $error)

## Delivery Monitor
var deliveryMonitor: DeliveryMonitor
if confCopy.reliabilityEnabled:
Expand All @@ -246,6 +277,7 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
key: confCopy.nodekey.get(),
node: node,
deliveryMonitor: deliveryMonitor,
appCallbacks: appCallbacks,
)

waku.setupSwitchServices(confCopy, relay, rng)
Expand Down
Loading