Skip to content

Commit

Permalink
test: attempt to fix event handler
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Feb 14, 2024
1 parent 1f9283a commit ba3bb13
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 99 deletions.
10 changes: 0 additions & 10 deletions waku-bindings/src/general/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use sscanf::{scanf, RegexRepresentation};
pub type WakuMessageVersion = usize;
/// Waku message id, hex encoded sha256 digest of the message
pub type MessageId = String;
/// Waku pubsub topic
pub type WakuPubSubTopic = String;

/// Waku response, just a `Result` with an `String` error.
pub type Result<T> = std::result::Result<T, String>;
Expand All @@ -29,7 +27,6 @@ pub struct WakuMessage {
payload: Vec<u8>,
/// The content topic to be set on the message
content_topic: WakuContentTopic,
// TODO: check if missing default should be 0
/// The Waku Message version number
#[serde(default)]
version: WakuMessageVersion,
Expand Down Expand Up @@ -238,13 +235,6 @@ mod base64_serde {
#[cfg(test)]
mod tests {
use super::*;
use crate::WakuPubSubTopic;

#[test]
fn parse_waku_topic() {
let s = "/waku/2/default-waku/proto";
let _: WakuPubSubTopic = s.parse().unwrap();
}

#[test]
fn deserialize_waku_message() {
Expand Down
6 changes: 2 additions & 4 deletions waku-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use rln;

pub use node::{
waku_create_content_topic, waku_default_pubsub_topic, waku_new, Event, Key, Multiaddr,
PublicKey, SecretKey, Signal, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
PublicKey, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
};

pub use general::{
Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic,
};
pub use general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion};
3 changes: 1 addition & 2 deletions waku-bindings/src/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// std
// crates
use crate::WakuPubSubTopic;
use multiaddr::Multiaddr;
use secp256k1::SecretKey;
use serde::{Deserialize, Serialize};
Expand All @@ -29,7 +28,7 @@ pub struct WakuNodeConfig {
/// Enable relay protocol. Default `true`
#[default(Some(true))]
pub relay: Option<bool>,
pub relay_topics: Vec<WakuPubSubTopic>,
pub relay_topics: Vec<String>,
// /// Enable store protocol to persist message history
// #[default(Some(false))]
// pub store: Option<bool>,
Expand Down
65 changes: 23 additions & 42 deletions waku-bindings/src/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,25 @@
//!
//! Asynchronous events require a callback to be registered.
//! An example of an asynchronous event that might be emitted is receiving a message.
//! When an event is emitted, this callback will be triggered receiving a [`Signal`]
//! When an event is emitted, this callback will be triggered receiving an [`Event`]
// std
use std::ffi::c_void;
// crates
use serde::{Deserialize, Serialize};
// internal
use crate::general::{WakuMessage, WakuPubSubTopic};
use crate::general::WakuMessage;
use crate::utils::get_trampoline;
use crate::MessageId;
use waku_sys::WakuCallBack;

/// Event signal
#[derive(Serialize, Deserialize)]
pub struct Signal {
/// Type of signal being emitted. Currently, only message is available
#[serde(alias = "type")]
_type: String,
/// Format depends on the type of signal
event: Event,
}

impl Signal {
pub fn event(&self) -> &Event {
&self.event
}
}

/// Waku event
/// For now just WakuMessage is supported
#[non_exhaustive]
#[derive(Serialize, Deserialize)]
#[serde(untagged, rename_all = "camelCase")]
#[serde(tag = "eventType", rename_all = "camelCase")]
pub enum Event {
#[serde(rename = "message")]
WakuMessage(WakuMessageEvent),
Unrecognized(serde_json::Value),
}
Expand All @@ -45,15 +30,15 @@ pub enum Event {
#[serde(rename_all = "camelCase")]
pub struct WakuMessageEvent {
/// The pubsub topic on which the message was received
pubsub_topic: WakuPubSubTopic,
pubsub_topic: String,
/// The message id
message_id: MessageId,
/// The message in [`WakuMessage`] format
waku_message: WakuMessage,
}

impl WakuMessageEvent {
pub fn pubsub_topic(&self) -> &WakuPubSubTopic {
pub fn pubsub_topic(&self) -> &String {
&self.pubsub_topic
}

Expand All @@ -66,43 +51,39 @@ impl WakuMessageEvent {
}
}

/// Wrapper callback, it transformst the `*const c_char` into a [`Signal`]
fn callback<F: FnMut(Signal) + Send + Sync>(mut f: F) -> WakuCallBack {
let cb = |v: &str| {
let data: Signal = serde_json::from_str(v).expect("Parsing signal to succeed");
/// Wrapper callback, it transformst the `*const c_char` into an [`Event`]
fn callback<F: FnMut(Event) + Send + Sync>(mut f: F) -> WakuCallBack {
let cb = move |v: &str| {
let data: Event = serde_json::from_str(v).expect("Parsing event to succeed");
println!("EXEC CALLBACK")

Check failure on line 58 in waku-bindings/src/node/events.rs

View workflow job for this annotation

GitHub Actions / Rust lints

expected `;`, found `f`

Check failure on line 58 in waku-bindings/src/node/events.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

expected `;`, found `f`

Check failure on line 58 in waku-bindings/src/node/events.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

expected `;`, found `f`

Check failure on line 58 in waku-bindings/src/node/events.rs

View workflow job for this annotation

GitHub Actions / Test

expected `;`, found `f`
f(data);
println!("SUCCESS!");
};

get_trampoline(&cb)
}

/// Register callback to act as event handler and receive application signals,
/// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Signal) + Send + Sync>(ctx: *mut c_void, f: F) {
// <F: FnMut(Signal) + Send + Sync + 'static> , , f: F
pub fn waku_set_event_callback<F: FnMut(Event) + Send + Sync>(ctx: *mut c_void, f: F) {
unsafe { waku_sys::waku_set_event_callback(ctx, callback(f), std::ptr::null_mut()) };
}

#[cfg(test)]
mod tests {
/*use crate::events::waku_set_event_callback;
use crate::{Event, Signal};
use crate::node::events::callback;
use crate::{Event, WakuMessageEvent};

Check warning on line 75 in waku-bindings/src/node/events.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `WakuMessageEvent`

// TODO: how to actually send a signal and check if the callback is run?
// TODO: how to actually send an event and check if the callback is run?
#[test]
fn set_event_callback() {
waku_set_event_callback(|_signal| {});
fn set_callback() {
callback(|_event| {});
}

#[test]
fn deserialize_signal() {
let s = "{\"type\":\"message\",\"event\":{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}}";
let _: Signal = serde_json::from_str(s).unwrap();
fn deserialize_message_event() {
let s = "{\"eventType\":\"message\",\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
let evt: Event = serde_json::from_str(s).unwrap();
assert!(matches!(evt, Event::WakuMessage(_)));
}
#[test]
fn deserialize_event() {
let e = "{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
let _: Event = serde_json::from_str(e).unwrap();
}*/
}
12 changes: 6 additions & 6 deletions waku-bindings/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use std::time::Duration;
use libc::c_void;
// internal

use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic};
use crate::general::{MessageId, Result, WakuMessage};

pub use config::WakuNodeConfig;
pub use events::{Event, Signal, WakuMessageEvent};
pub use events::{Event, WakuMessageEvent};
pub use relay::{waku_create_content_topic, waku_default_pubsub_topic};

/// Handle to the underliying waku node
Expand Down Expand Up @@ -54,23 +54,23 @@ impl WakuNodeHandle {
pub fn relay_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: &WakuPubSubTopic,
pubsub_topic: &String,
timeout: Option<Duration>,
) -> Result<MessageId> {
relay::waku_relay_publish_message(self.ctx, message, pubsub_topic, timeout)
}

/// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
pub fn relay_subscribe(&self, pubsub_topic: &String) -> Result<()> {
relay::waku_relay_subscribe(self.ctx, pubsub_topic)
}

/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
pub fn relay_unsubscribe(&self, pubsub_topic: &String) -> Result<()> {
relay::waku_relay_unsubscribe(self.ctx, pubsub_topic)
}

pub fn set_event_callback<F: FnMut(Signal) + Send + Sync>(&self, f: F) {
pub fn set_event_callback<F: FnMut(Event) + Send + Sync + 'static>(&self, f: F) {
events::waku_set_event_callback(self.ctx, f)
}
}
Expand Down
12 changes: 6 additions & 6 deletions waku-bindings/src/node/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic};
use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage};
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};

Check warning on line 10 in waku-bindings/src/node/relay.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

unused import: `handle_json_response`

Check warning on line 10 in waku-bindings/src/node/relay.rs

View workflow job for this annotation

GitHub Actions / Test Suite (ubuntu-latest, stable-x86_64-unknown-linux-gnu)

unused import: `handle_json_response`

Check warning on line 10 in waku-bindings/src/node/relay.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `handle_json_response`

/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
Expand Down Expand Up @@ -57,7 +57,7 @@ pub fn waku_create_content_topic(

/// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/)
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> WakuPubSubTopic {
pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> String {
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
Expand All @@ -74,7 +74,7 @@ pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> WakuPubSubTopic {
pub fn waku_relay_publish_message(
ctx: *mut c_void,
message: &WakuMessage,
pubsub_topic: &WakuPubSubTopic,
pubsub_topic: &String,
timeout: Option<Duration>,
) -> Result<MessageId> {
let pubsub_topic = pubsub_topic.to_string();
Expand All @@ -96,8 +96,8 @@ pub fn waku_relay_publish_message(
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_publish(
ctx,
message_ptr,
pubsub_topic_ptr,
message_ptr,
timeout
.map(|duration| {
duration
Expand All @@ -119,7 +119,7 @@ pub fn waku_relay_publish_message(
handle_response(code, &result)
}

pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &String) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
Expand All @@ -145,7 +145,7 @@ pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) ->
handle_no_response(code, &error)
}

pub fn waku_relay_unsubscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
pub fn waku_relay_unsubscribe(ctx: *mut c_void, pubsub_topic: &String) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
Expand Down
Loading

0 comments on commit ba3bb13

Please sign in to comment.