Skip to content

Commit

Permalink
new PubsubTopic type
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Nov 27, 2024
1 parent 2fd169b commit 8cf8c41
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 44 deletions.
5 changes: 3 additions & 2 deletions examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::str::from_utf8;
use std::time::SystemTime;
use tokio::time::{sleep, Duration};
use waku::{
waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic, WakuMessage, WakuNodeConfig,
general::pubsubtopic::PubsubTopic, waku_new, Encoding, Event, LibwakuResponse,
WakuContentTopic, WakuMessage, WakuNodeConfig,
};

#[tokio::main]
Expand Down Expand Up @@ -73,7 +74,7 @@ async fn main() -> Result<(), Error> {

// ========================================================================
// Subscribe to pubsub topic
let topic = "test".to_string();
let topic = PubsubTopic::new("test");

node1
.relay_subscribe(&topic)
Expand Down
15 changes: 8 additions & 7 deletions examples/tic-tac-toe-gui/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::time::{SystemTime, Duration};
use tokio::sync::mpsc;
use waku::{
waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic,
WakuMessage, WakuNodeConfig, WakuNodeHandle, Initialized, Running
WakuMessage, WakuNodeConfig, WakuNodeHandle, Initialized, Running,
general::pubsubtopic::PubsubTopic,
};

#[derive(Serialize, Deserialize, PartialEq, Debug, Copy, Clone)]
Expand All @@ -26,15 +27,15 @@ struct GameState {
struct TicTacToeApp<State> {
game_state: Arc<Mutex<GameState>>,
waku: WakuNodeHandle<State>,
game_topic: &'static str,
game_topic: PubsubTopic,
tx: mpsc::Sender<String>, // Sender to send `msg` to main thread
player_role: Option<Player>, // Store the player's role (X or O)
}

impl TicTacToeApp<Initialized> {
fn new(
waku: WakuNodeHandle<Initialized>,
game_topic: &'static str,
game_topic: PubsubTopic,
game_state: Arc<Mutex<GameState>>,
tx: mpsc::Sender<String>,
) -> Self {
Expand Down Expand Up @@ -92,7 +93,7 @@ impl TicTacToeApp<Initialized> {

let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
let content_topics = vec![ctopic];
waku.filter_subscribe(&self.game_topic.to_string(), content_topics).expect("waku should subscribe");
waku.filter_subscribe(&self.game_topic, content_topics).expect("waku should subscribe");

// Connect to hard-coded node
// let target_node_multi_addr =
Expand Down Expand Up @@ -135,7 +136,7 @@ impl TicTacToeApp<Running> {

// self.waku.relay_publish_message(&message, &self.game_topic.to_string(), None)
// .expect("Failed to send message");
self.waku.lightpush_publish_message(&message, &self.game_topic.to_string()).expect("Failed to send message");
self.waku.lightpush_publish_message(&message, &self.game_topic).expect("Failed to send message");
}

fn make_move(&mut self, row: usize, col: usize) {
Expand Down Expand Up @@ -306,15 +307,15 @@ impl eframe::App for TicTacToeApp<Running> {
async fn main() -> eframe::Result<()> {
let (tx, mut rx) = mpsc::channel::<String>(3200); // Channel to communicate between threads

let game_topic = "/waku/2/rs/16/32";
let game_topic = PubsubTopic::new("/waku/2/rs/16/32");
// Create a Waku instance
let waku = waku_new(Some(WakuNodeConfig {
tcp_port: Some(60010),
cluster_id: Some(16),
shards: vec![1, 32, 64, 128, 256],
// node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()),
max_message_size: Some("1024KiB".to_string()),
relay_topics: vec![game_topic.to_string()],
relay_topics: vec![String::from(&game_topic)],
log_level: Some("DEBUG"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL

keep_alive: Some(true),
Expand Down
1 change: 1 addition & 0 deletions waku-bindings/src/general/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Waku [general](https://rfc.vac.dev/spec/36/#general) types
pub mod contenttopic;
pub mod pubsubtopic;

// crates
use contenttopic::WakuContentTopic;
Expand Down
16 changes: 16 additions & 0 deletions waku-bindings/src/general/pubsubtopic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PubsubTopic(String);

impl PubsubTopic {
// Constructor to create a new MyString
pub fn new(value: &str) -> Self {
PubsubTopic(value.to_string())
}
}

// to allow conversion from `PubsubTopic` to `String`
impl From<&PubsubTopic> for String {
fn from(topic: &PubsubTopic) -> Self {
topic.0.to_string()
}
}
2 changes: 1 addition & 1 deletion waku-bindings/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! # Waku
//!
//! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/)
mod general;
pub mod general;
pub mod node;
pub mod utils;

Expand Down
11 changes: 5 additions & 6 deletions waku-bindings/src/node/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ use std::ffi::CString;
use libc::*;
// internal
use crate::general::contenttopic::WakuContentTopic;
use crate::general::pubsubtopic::PubsubTopic;
use crate::general::Result;
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse};

pub fn waku_filter_subscribe(
ctx: &WakuNodeContext,
pubsub_topic: &str,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let content_topics = WakuContentTopic::join_content_topics(content_topics);

let pubsub_topic_ptr = CString::new(pubsub_topic)
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
let content_topics_ptr = CString::new(content_topics)
Expand Down Expand Up @@ -49,13 +49,12 @@ pub fn waku_filter_subscribe(

pub fn waku_filter_unsubscribe(
ctx: &WakuNodeContext,
pubsub_topic: &str,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics
) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let content_topics_topics = WakuContentTopic::join_content_topics(content_topics);

let pubsub_topic_ptr = CString::new(pubsub_topic)
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
let content_topics_topics_ptr = CString::new(content_topics_topics)
Expand Down
8 changes: 4 additions & 4 deletions waku-bindings/src/node/lightpush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ use crate::general::{MessageHash, Result, WakuMessage};
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_response, LibwakuResponse};

use crate::general::pubsubtopic::PubsubTopic;

pub fn waku_lightpush_publish_message(
ctx: &WakuNodeContext,
message: &WakuMessage,
pubsub_topic: &str,
pubsub_topic: &PubsubTopic,
) -> Result<MessageHash> {
let pubsub_topic = pubsub_topic.to_string();

let message_ptr = CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
)
.expect("CString should build properly from the serialized waku message")
.into_raw();
let pubsub_topic_ptr = CString::new(pubsub_topic)
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();

Expand Down
15 changes: 8 additions & 7 deletions waku-bindings/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::marker::PhantomData;
use std::time::Duration;
// internal
use crate::general::contenttopic::{Encoding, WakuContentTopic};
pub use crate::general::pubsubtopic::PubsubTopic;
use crate::general::{MessageHash, Result, WakuMessage};
use crate::utils::LibwakuResponse;

Expand Down Expand Up @@ -105,7 +106,7 @@ impl WakuNodeHandle<Running> {

pub fn relay_publish_txt(
&self,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
msg_txt: &String,
content_topic_name: &'static str,
timeout: Option<Duration>,
Expand Down Expand Up @@ -134,33 +135,33 @@ impl WakuNodeHandle<Running> {
pub fn relay_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
timeout: Option<Duration>,
) -> Result<MessageHash> {
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout)
}

/// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, pubsub_topic: &String) -> Result<()> {
pub fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_subscribe(&self.ctx, pubsub_topic)
}

/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, pubsub_topic: &String) -> Result<()> {
pub fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
}

pub fn filter_subscribe(
&self,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics)
}

pub fn filter_unsubscribe(
&self,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics)
Expand All @@ -173,7 +174,7 @@ impl WakuNodeHandle<Running> {
pub fn lightpush_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
) -> Result<MessageHash> {
lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic)
}
Expand Down
17 changes: 7 additions & 10 deletions waku-bindings/src/node/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;
use libc::*;
// internal
use crate::general::contenttopic::{Encoding, WakuContentTopic};
use crate::general::pubsubtopic::PubsubTopic;
use crate::general::{MessageHash, Result, WakuMessage};
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse};
Expand Down Expand Up @@ -61,18 +62,16 @@ pub fn waku_create_content_topic(
pub fn waku_relay_publish_message(
ctx: &WakuNodeContext,
message: &WakuMessage,
pubsub_topic: &str,
pubsub_topic: &PubsubTopic,
timeout: Option<Duration>,
) -> Result<MessageHash> {
let pubsub_topic = pubsub_topic.to_string();

let message_ptr = CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
)
.expect("CString should build properly from the serialized waku message")
.into_raw();
let pubsub_topic_ptr = CString::new(pubsub_topic)
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();

Expand Down Expand Up @@ -106,9 +105,8 @@ pub fn waku_relay_publish_message(
handle_response(code, result)
}

pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &str) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();

Expand All @@ -132,9 +130,8 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &str) -> Result
handle_no_response(code, result)
}

pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();

Expand Down
19 changes: 12 additions & 7 deletions waku-bindings/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::time::{Duration, SystemTime};
use std::{collections::HashSet, str::from_utf8};
use tokio::time;
use tokio::time::sleep;
use waku_bindings::node::PubsubTopic;
use waku_bindings::{
waku_new, Encoding, Event, Initialized, MessageHash, WakuContentTopic, WakuMessage,
WakuNodeConfig, WakuNodeHandle,
Expand All @@ -21,10 +22,11 @@ fn try_publish_relay_messages(
node: &WakuNodeHandle<Running>,
msg: &WakuMessage,
) -> Result<HashSet<MessageHash>, String> {
let topic = TEST_PUBSUBTOPIC.to_string();
Ok(HashSet::from([
node.relay_publish_message(msg, &topic, None)?
]))
Ok(HashSet::from([node.relay_publish_message(
msg,
&PubsubTopic::new(TEST_PUBSUBTOPIC),
None,
)?]))
}

async fn test_echo_messages(
Expand Down Expand Up @@ -67,9 +69,12 @@ async fn test_echo_messages(
let node1 = node1.start()?;
let node2 = node2.start()?;

let topic = TEST_PUBSUBTOPIC.to_string();
node1.relay_subscribe(&topic).unwrap();
node2.relay_subscribe(&topic).unwrap();
node1
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
.unwrap();
node2
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
.unwrap();

sleep(Duration::from_secs(3)).await;

Expand Down

0 comments on commit 8cf8c41

Please sign in to comment.