Skip to content

Commit

Permalink
wide adaptations to make the waku crate behave tokio-based async
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Dec 19, 2024
1 parent 9d73660 commit e937e05
Show file tree
Hide file tree
Showing 18 changed files with 449 additions and 290 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 15 additions & 8 deletions examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ async fn main() -> Result<(), Error> {
tcp_port: Some(60010), // TODO: use any available port.
..Default::default()
}))
.await
.expect("should instantiate");

let node2 = waku_new(Some(WakuNodeConfig {
tcp_port: Some(60020), // TODO: use any available port.
..Default::default()
}))
.await
.expect("should instantiate");

// ========================================================================
Expand All @@ -31,7 +33,7 @@ async fn main() -> Result<(), Error> {

match event {
WakuEvent::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
// println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
Expand All @@ -54,7 +56,7 @@ async fn main() -> Result<(), Error> {

match event {
WakuEvent::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
// println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
Expand All @@ -69,30 +71,34 @@ async fn main() -> Result<(), Error> {
})
.expect("set event call back working");

let node1 = node1.start().expect("node1 should start");
let node2 = node2.start().expect("node2 should start");
let node1 = node1.start().await.expect("node1 should start");
let node2 = node2.start().await.expect("node2 should start");

// ========================================================================
// Subscribe to pubsub topic
let topic = PubsubTopic::new("test");

node1
.relay_subscribe(&topic)
.await
.expect("node1 should subscribe");

node2
.relay_subscribe(&topic)
.await
.expect("node2 should subscribe");

// ========================================================================
// Connect nodes with each other

let addresses2 = node2
.listen_addresses()
.await
.expect("should obtain the addresses");

node1
.connect(&addresses2[0], None)
.await
.expect("node1 should connect to node2");

// ========================================================================
Expand All @@ -119,6 +125,7 @@ async fn main() -> Result<(), Error> {
);
node1
.relay_publish_message(&message, &topic, None)
.await
.expect("should have sent the message");

// ========================================================================
Expand All @@ -129,13 +136,13 @@ async fn main() -> Result<(), Error> {
// ========================================================================
// Stop both instances

let node1 = node1.stop().expect("should stop");
let node2 = node2.stop().expect("should stop");
let node1 = node1.stop().await.expect("should stop");
let node2 = node2.stop().await.expect("should stop");

// ========================================================================
// Free resources
node1.waku_destroy().expect("should deallocate");
node2.waku_destroy().expect("should deallocate");
node1.waku_destroy().await.expect("should deallocate");
node2.waku_destroy().await.expect("should deallocate");

Ok(())
}
41 changes: 27 additions & 14 deletions examples/tic-tac-toe-gui/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use std::str::from_utf8;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, Duration};
use tokio::task;

use tokio::sync::mpsc;
use waku::{
Expand Down Expand Up @@ -48,7 +49,7 @@ impl TicTacToeApp<Initialized> {
}
}

fn start(self) -> TicTacToeApp<Running> {
async fn start(self) -> TicTacToeApp<Running> {
let tx_clone = self.tx.clone();

let my_closure = move |response| {
Expand Down Expand Up @@ -84,14 +85,14 @@ impl TicTacToeApp<Initialized> {
self.waku.set_event_callback(my_closure).expect("set event call back working");

// Start the waku node
let waku = self.waku.start().expect("waku should start");
let waku = self.waku.start().await.expect("waku should start");

// Subscribe to desired topic using the relay protocol
// self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe");
waku.relay_subscribe(&self.game_topic).await.expect("waku should subscribe");

let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
let content_topics = vec![ctopic];
waku.filter_subscribe(&self.game_topic, content_topics).expect("waku should subscribe");
// let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
// let content_topics = vec![ctopic];
// waku.filter_subscribe(&self.game_topic, content_topics).await.expect("waku should subscribe");

// Connect to hard-coded node
// let target_node_multi_addr =
Expand All @@ -114,7 +115,7 @@ impl TicTacToeApp<Initialized> {
}

impl TicTacToeApp<Running> {
fn send_game_state(&self, game_state: &GameState) {
async fn send_game_state(&self, game_state: &GameState) {
let serialized_game_state = serde_json::to_string(game_state).unwrap();
let content_topic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);

Expand All @@ -132,9 +133,11 @@ impl TicTacToeApp<Running> {
false,
);

// self.waku.relay_publish_message(&message, &self.game_topic.to_string(), None)
// .expect("Failed to send message");
self.waku.lightpush_publish_message(&message, &self.game_topic).expect("Failed to send message");
if let Ok(msg_hash) = self.waku.relay_publish_message(&message, &self.game_topic, None).await {
dbg!(format!("message hash published: {}", msg_hash));
}

// self.waku.lightpush_publish_message(&message, &self.game_topic);
}

fn make_move(&mut self, row: usize, col: usize) {
Expand All @@ -159,7 +162,17 @@ impl TicTacToeApp<Running> {
};
}

self.send_game_state(&game_state); // Send updated state after a move
// Call the async function in a blocking context
task::block_in_place(|| {
// Obtain the current runtime handle
let handle = tokio::runtime::Handle::current();

// Block on the async function
handle.block_on(async {
// Assuming `self` is available in the current context
self.send_game_state(&game_state).await;
});
});
}
}
}
Expand Down Expand Up @@ -314,7 +327,7 @@ async fn main() -> eframe::Result<()> {
// node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()),
max_message_size: Some("1024KiB".to_string()),
relay_topics: vec![String::from(&game_topic)],
log_level: Some("DEBUG"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL
log_level: Some("FATAL"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL

keep_alive: Some(true),

Expand All @@ -326,7 +339,7 @@ async fn main() -> eframe::Result<()> {
// discv5_enr_auto_update: Some(false),

..Default::default()
}))
})).await
.expect("should instantiate");

let game_state = GameState {
Expand All @@ -339,7 +352,7 @@ async fn main() -> eframe::Result<()> {
let clone = shared_state.clone();
let app = TicTacToeApp::new(waku, game_topic, clone, tx);

let app = app.start();
let app = app.start().await;

let clone = shared_state.clone();
// Listen for messages in the main thread
Expand Down
3 changes: 2 additions & 1 deletion examples/toy-chat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ tui = "0.19"
crossterm = "0.25"
unicode-width = "0.1"
prost = "0.11"
chrono = "0.4"
chrono = "0.4"
tokio = { version = "1", features = ["full"] }
58 changes: 35 additions & 23 deletions examples/toy-chat/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod protocol;

use crate::protocol::{Chat2Message, TOY_CHAT_CONTENT_TOPIC};
use tokio::task;
use chrono::Utc;
use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
Expand Down Expand Up @@ -48,7 +49,7 @@ struct App<State> {
}

impl App<Initialized> {
fn new(nick: String) -> Result<App<Initialized>> {
async fn new(nick: String) -> Result<App<Initialized>> {
let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC);
let waku = waku_new(Some(WakuNodeConfig {
tcp_port: Some(60010),
Expand All @@ -69,7 +70,7 @@ impl App<Initialized> {
// discv5_enr_auto_update: Some(false),

..Default::default()
}))?;
})).await?;

Ok(App {
input: String::new(),
Expand All @@ -80,7 +81,7 @@ impl App<Initialized> {
})
}

fn start_waku_node(self) -> Result<App<Running>> {
async fn start_waku_node(self) -> Result<App<Running>> {

let shared_messages = Arc::clone(&self.messages);

Expand Down Expand Up @@ -116,10 +117,10 @@ impl App<Initialized> {
}
})?;

let waku = self.waku.start()?;
let waku = self.waku.start().await?;

let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC);
waku.relay_subscribe(&pubsub_topic)?;
waku.relay_subscribe(&pubsub_topic).await?;

Ok(App {
input: self.input,
Expand All @@ -133,8 +134,8 @@ impl App<Initialized> {

impl App<Running> {

fn retrieve_history(&mut self) {
let messages = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE).unwrap();
async fn retrieve_history(&mut self) {
let messages = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE).await.unwrap();
let messages:Vec<_> = messages
.iter()
.map(|store_resp_msg| {
Expand Down Expand Up @@ -183,15 +184,25 @@ impl App<Running> {
false,
);

let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC);
if let Err(e) = self.waku.relay_publish_message(
&waku_message,
&pubsub_topic,
None,
) {
let mut out = std::io::stderr();
write!(out, "{e:?}").unwrap();
}
// Call the async function in a blocking context
task::block_in_place(|| {
// Obtain the current runtime handle
let handle = tokio::runtime::Handle::current();

// Block on the async function
handle.block_on(async {
// Assuming `self` is available in the current context
let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC);
if let Err(e) = self.waku.relay_publish_message(
&waku_message,
&pubsub_topic,
None,
).await {
let mut out = std::io::stderr();
write!(out, "{e:?}").unwrap();
}
});
});
}
KeyCode::Char(c) => {
self.input.push(c);
Expand All @@ -210,16 +221,17 @@ impl App<Running> {
}
}

fn stop_app(self) {
self.waku.stop().expect("the node should stop properly");
async fn stop_app(self) {
self.waku.stop().await.expect("the node should stop properly");
}
}

fn main() -> std::result::Result<(), Box<dyn Error>> {
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn Error>> {
let nick = std::env::args().nth(1).expect("Nick to be set");

let app = App::new(nick)?;
let mut app = app.start_waku_node()?;
let app = App::new(nick).await?;
let mut app = app.start_waku_node().await?;

// setup terminal
enable_raw_mode()?;
Expand All @@ -228,9 +240,9 @@ fn main() -> std::result::Result<(), Box<dyn Error>> {
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;

app.retrieve_history();
app.retrieve_history().await;
let res = app.run_main_loop(&mut terminal);
app.stop_app();
app.stop_app().await;

// restore terminal
disable_raw_mode()?;
Expand Down
8 changes: 8 additions & 0 deletions waku-bindings/src/general/contenttopic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// std
use crate::general::Result;
use crate::utils::WakuDecode;
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
Expand Down Expand Up @@ -79,6 +81,12 @@ impl WakuContentTopic {
}
}

impl WakuDecode for WakuContentTopic {
fn decode(input: &str) -> Result<Self> {
Ok(serde_json::from_str(input).expect("could not parse store resp"))
}
}

impl FromStr for WakuContentTopic {
type Err = String;

Expand Down
Loading

0 comments on commit e937e05

Please sign in to comment.