Skip to content

Commit

Permalink
refactor: node handle constructor and messageHash on publish (#98)
Browse files Browse the repository at this point in the history
* refactor: node handle constructor and messageId on publish
* refactor: add back typestate
* chore: rename messageId to messageHash
  • Loading branch information
richard-ramos authored Mar 1, 2024
1 parent a10a5c2 commit 69a4872
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 57 deletions.
4 changes: 2 additions & 2 deletions waku-bindings/src/general/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use sscanf::{scanf, RegexRepresentation};

/// Waku message version
pub type WakuMessageVersion = usize;
/// Waku message id, hex encoded sha256 digest of the message
pub type MessageId = String;
/// Waku message hash, hex encoded sha256 digest of the message
pub type MessageHash = String;

/// Waku response, just a `Result` with an `String` error.
pub type Result<T> = std::result::Result<T, String>;
Expand Down
8 changes: 5 additions & 3 deletions waku-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ mod utils;
use rln;

pub use node::{
waku_create_content_topic, waku_new, Event, Key, Multiaddr, PublicKey, SecretKey,
WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
waku_create_content_topic, waku_new, Event, Initialized, Key, Multiaddr, PublicKey, Running,
SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
};

pub use general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion};
pub use general::{
Encoding, MessageHash, Result, WakuContentTopic, WakuMessage, WakuMessageVersion,
};
19 changes: 8 additions & 11 deletions waku-bindings/src/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use crate::general::WakuMessage;
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, LibwakuResponse};
use crate::MessageId;
use crate::MessageHash;

/// Waku event
/// For now just WakuMessage is supported
Expand All @@ -31,8 +31,8 @@ pub enum Event {
pub struct WakuMessageEvent {
/// The pubsub topic on which the message was received
pub pubsub_topic: String,
/// The message id
pub message_id: MessageId,
/// The message hash
pub message_hash: MessageHash,
/// The message in [`WakuMessage`] format
pub waku_message: WakuMessage,
}
Expand All @@ -41,13 +41,10 @@ pub struct WakuMessageEvent {
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Event) + Send + Sync>(ctx: &WakuNodeContext, mut f: F) {
let cb = |response: LibwakuResponse| {
match response {
LibwakuResponse::Success(v) => {
let data: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
f(data);
}
_ => {} // Do nothing
if let LibwakuResponse::Success(v) = response {
let data: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
f(data);
};
};

Expand All @@ -65,7 +62,7 @@ mod tests {

#[test]
fn deserialize_message_event() {
let s = "{\"eventType\":\"message\",\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
let s = "{\"eventType\":\"message\",\"messageHash\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
let evt: Event = serde_json::from_str(s).unwrap();
assert!(matches!(evt, Event::WakuMessage(_)));
}
Expand Down
55 changes: 39 additions & 16 deletions waku-bindings/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,62 @@ mod relay;
pub use aes_gcm::Key;
pub use multiaddr::Multiaddr;
pub use secp256k1::{PublicKey, SecretKey};
use std::marker::PhantomData;
use std::time::Duration;
// internal
use crate::general::{Result, WakuMessage};
use crate::general::{MessageHash, Result, WakuMessage};
use context::WakuNodeContext;

pub use config::WakuNodeConfig;
pub use events::{Event, WakuMessageEvent};
pub use relay::waku_create_content_topic;

/// Marker trait to disallow undesired waku node states in the handle
pub trait WakuNodeState {}

/// Waku node initialized state
pub struct Initialized;

/// Waku node running state
pub struct Running;

impl WakuNodeState for Initialized {}
impl WakuNodeState for Running {}

/// Handle to the underliying waku node
pub struct WakuNodeHandle {
pub struct WakuNodeHandle<State: WakuNodeState> {
ctx: WakuNodeContext,
phantom: PhantomData<State>,
}

impl WakuNodeHandle {
/// Spawn a new Waku node with the given configuration (default configuration if `None` provided)
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
Ok(WakuNodeHandle {
ctx: management::waku_new(config)?,
phantom: PhantomData,
})
}

impl WakuNodeHandle<Initialized> {
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
pub fn start(&self) -> Result<()> {
management::waku_start(&self.ctx)
pub fn start(self) -> Result<WakuNodeHandle<Running>> {
management::waku_start(&self.ctx).map(|_| WakuNodeHandle {
ctx: self.ctx,
phantom: PhantomData,
})
}
}

impl WakuNodeHandle<Running> {
/// Stops a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn stop(&self) -> Result<()> {
management::waku_stop(&self.ctx)
pub fn stop(self) -> Result<WakuNodeHandle<Initialized>> {
management::waku_stop(&self.ctx).map(|_| WakuNodeHandle {
ctx: self.ctx,
phantom: PhantomData,
})
}

/// Get the multiaddresses the Waku node is listening to
Expand Down Expand Up @@ -66,7 +97,7 @@ impl WakuNodeHandle {
message: &WakuMessage,
pubsub_topic: &String,
timeout: Option<Duration>,
) -> Result<()> {
) -> Result<MessageHash> {
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout)
}

Expand All @@ -84,11 +115,3 @@ impl WakuNodeHandle {
events::waku_set_event_callback(&self.ctx, f)
}
}

/// Spawn a new Waku node with the given configuration (default configuration if `None` provided)
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle> {
Ok(WakuNodeHandle {
ctx: management::waku_new(config)?,
})
}
6 changes: 3 additions & 3 deletions waku-bindings/src/node/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::{Encoding, Result, WakuContentTopic, WakuMessage};
use crate::general::{Encoding, MessageHash, Result, WakuContentTopic, WakuMessage};
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse};

Expand Down Expand Up @@ -62,7 +62,7 @@ pub fn waku_relay_publish_message(
message: &WakuMessage,
pubsub_topic: &String,
timeout: Option<Duration>,
) -> Result<()> {
) -> Result<MessageHash> {
let pubsub_topic = pubsub_topic.to_string();

let message_ptr = CString::new(
Expand Down Expand Up @@ -102,7 +102,7 @@ pub fn waku_relay_publish_message(
out
};

handle_no_response(code, result)
handle_response(code, result)
}

pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> {
Expand Down
7 changes: 5 additions & 2 deletions waku-bindings/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ impl TryFrom<(u32, &str)> for LibwakuResponse {
let opt_value = Some(response.to_string()).filter(|s| !s.is_empty());
match ret_code {
RET_OK => Ok(LibwakuResponse::Success(opt_value)),
RET_ERR => Ok(LibwakuResponse::Failure(format!("waku error: {}", response))),
RET_ERR => Ok(LibwakuResponse::Failure(format!(
"waku error: {}",
response
))),
RET_MISSING_CALLBACK => Ok(LibwakuResponse::MissingCallback),
_ => Err(format!("undefined return code {}", ret_code)),
}
Expand Down Expand Up @@ -98,7 +101,7 @@ pub fn handle_response<F: FromStr>(code: i32, result: LibwakuResponse) -> Result
LibwakuResponse::Success(v) => v
.unwrap_or_default()
.parse()
.map_err(|_| format!("could not parse value")),
.map_err(|_| "could not parse value".into()),
LibwakuResponse::Failure(v) => Err(v),
LibwakuResponse::MissingCallback => panic!("callback is required"),
LibwakuResponse::Undefined => panic!(
Expand Down
45 changes: 26 additions & 19 deletions waku-bindings/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,43 @@ use secp256k1::SecretKey;
use serial_test::serial;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use std::{str::from_utf8};
use std::{collections::HashSet, str::from_utf8};
use tokio::sync::broadcast::{self, Sender};
use tokio::time;
use tokio::time::sleep;
use waku_bindings::{
waku_new, Encoding, Event, MessageId, WakuContentTopic, WakuMessage, WakuNodeConfig,
waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage, WakuNodeConfig,
WakuNodeHandle,
};
const ECHO_TIMEOUT: u64 = 10;
const ECHO_MESSAGE: &str = "Hi from 🦀!";
const TEST_PUBSUBTOPIC: &str = "test";

fn try_publish_relay_messages(
node: &WakuNodeHandle,
node: &WakuNodeHandle<Running>,
msg: &WakuMessage,
) -> Result<(), String> {
) -> Result<HashSet<MessageHash>, String> {
let topic = TEST_PUBSUBTOPIC.to_string();
Ok(node.relay_publish_message(msg, &topic, None)?)
Ok(HashSet::from([
node.relay_publish_message(msg, &topic, None)?
]))
}

#[derive(Debug, Clone)]
struct Response {
id: MessageId,
hash: MessageHash,
payload: Vec<u8>,
}

fn set_callback(node: &WakuNodeHandle, tx: Sender<Response>) {
fn set_callback(node: &WakuNodeHandle<Running>, tx: Sender<Response>) {
node.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let id = message.message_id;
let hash = message.message_hash;
let message = message.waku_message;
let payload = message.payload.to_vec();

tx.send(Response {
id: id.to_string(),
hash: hash.to_string(),
payload,
})
.expect("send response to the receiver");
Expand All @@ -45,8 +47,8 @@ fn set_callback(node: &WakuNodeHandle, tx: Sender<Response>) {
}

async fn test_echo_messages(
node1: &WakuNodeHandle,
node2: &WakuNodeHandle,
node1: &WakuNodeHandle<Running>,
node2: &WakuNodeHandle<Running>,
content: &'static str,
content_topic: WakuContentTopic,
) {
Expand All @@ -69,11 +71,16 @@ async fn test_echo_messages(
let (tx, mut rx) = broadcast::channel(1);
set_callback(node2, tx);

try_publish_relay_messages(node1, &message).expect("send relay messages");

let mut ids = try_publish_relay_messages(node1, &message).expect("send relay messages");
while let Ok(res) = rx.recv().await {
assert!(!res.id.is_empty());
from_utf8(&res.payload).expect("should be valid message");
if ids.take(&res.hash).is_some() {
let msg = from_utf8(&res.payload).expect("should be valid message");
assert_eq!(content, msg);
}

if ids.is_empty() {
break;
}
}
}

Expand All @@ -89,8 +96,8 @@ async fn default_echo() -> Result<(), String> {
..Default::default()
}))?;

node1.start()?;
node2.start()?;
let node1 = node1.start()?;
let node2 = node2.start()?;

let addresses1 = node1.listen_addresses()?;
node2.connect(&addresses1[0], None)?;
Expand All @@ -101,7 +108,7 @@ async fn default_echo() -> Result<(), String> {
node2.relay_subscribe(&topic)?;

// Wait for mesh to form
sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(3)).await;

let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);

Expand Down Expand Up @@ -136,7 +143,7 @@ fn node_restart() {
for _ in 0..3 {
let node = waku_new(config.clone().into()).expect("default config should be valid");

node.start().expect("node should start with valid config");
let node = node.start().expect("node should start with valid config");

node.stop().expect("node should stop");
}
Expand Down
2 changes: 1 addition & 1 deletion waku-sys/vendor
Submodule vendor updated 107 files

0 comments on commit 69a4872

Please sign in to comment.