Skip to content

Commit

Permalink
feat(src/notify): Add notifiers for tokio & cleanup conditional imports
Browse files Browse the repository at this point in the history
  • Loading branch information
john-bv committed Aug 10, 2023
1 parent 8476650 commit d7c856a
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 69 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ authors = ["Bavfalcon9 <olybear9@gmail.com>"]
edition = "2021"

[features]
default = [ "async_std" ]
# default = [ "async_std" ]
default = ["async_tokio" ]
mcpe = []
debug = []
debug_all = []
Expand Down
2 changes: 1 addition & 1 deletion examples/async-std/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ edition = "2021"
[dependencies]
async-std = { version = "1.12.0", features = [ "attributes" ] }
console-subscriber = "0.1.8"
rak-rs = { path = "../../../", features = [ "debug", "debug_all", "mcpe", "async_tokio" ], default-features = false}
rak-rs = { path = "../../../", features = [ "debug", "debug_all", "mcpe" ] }
2 changes: 1 addition & 1 deletion examples/async-std/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rak_rs::Listener;
async fn main() {
// console_subscriber::init();
let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
server.motd.name = "RakNet Rust!".to_string();
server.motd.name = "RakNet Rust (async-std)!".to_string();
server.motd.gamemode = Gamemode::Survival;

server.start().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/tokio/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ edition = "2021"
[dependencies]
async-std = { version = "1.12.0", features = [ "attributes" ] }
console-subscriber = "0.1.8"
rak_rs = { path = "../", features = [ "debug", "debug_all", "mcpe", "async_tokio" ], default-features = false }
rak-rs = { path = "../../../", features = [ "debug", "debug_all", "mcpe", "async_tokio" ], default-features = false }
tokio = "1.23.0"
8 changes: 3 additions & 5 deletions examples/tokio/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use rak_rs::server::event::ServerEventResponse;
async fn main() {
console_subscriber::init();
let mut server = Listener::bind("0.0.0.0:19132").await.unwrap();
// let inner = server.recv_evnt.clone();
server.motd.name = "RakNet Rust!".to_string();
server.motd.name = "RakNet Rust (tokio)!".to_string();
server.motd.gamemode = Gamemode::Survival;

server.start().await.unwrap();
Expand All @@ -26,13 +25,12 @@ async fn main() {
async fn handle(mut conn: Connection) {
loop {
// keeping the connection alive
if conn.is_closed() {
if conn.is_closed().await {
println!("Connection closed!");
break;
}
if let Ok(pk) = conn.recv().await {
println!("Got a connection packet {:?} ", pk);
println!("(RAKNET RECIEVE SIDE) Got a connection packet {:?} ", pk);
}
// conn.tick().await;
}
}
2 changes: 1 addition & 1 deletion src/client/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::task::{Context, Poll, Waker};
use tokio::{
net::UdpSocket,
task::{self},
time::{sleep, timeout},
time::timeout,
};

use crate::connection::queue::send::SendQueue;
Expand Down
109 changes: 92 additions & 17 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl Client {
loop {
let length: usize;

#[cfg(feature = "async_std")]
select! {
killed = notifier.wait().fuse() => {
if killed {
Expand All @@ -221,6 +222,31 @@ impl Client {
}
}
};

#[cfg(feature = "async_tokio")]
select! {
killed = notifier.wait() => {
if killed {
rakrs_debug!(true, "[CLIENT] Socket task closed");
break;
}
}

recv = socket.recv(&mut buf) => {
match recv {
Ok(l) => length = l,
Err(e) => {
rakrs_debug!(true, "[CLIENT] Failed to receive packet: {}", e);
continue;
}
}
// no assertions because this is a client
// this allows the user to customize their own packet handling
if let Err(_) = net_send.send(buf[..length].to_vec()).await {
rakrs_debug!(true, "[CLIENT] Failed to send packet to network recv channel. Is the client closed?");
}
}
};
}
});

Expand Down Expand Up @@ -366,15 +392,20 @@ impl Client {
}
}

#[cfg(feature = "async_std")]
pub async fn recv(&self) -> Result<Vec<u8>, RecvError> {
match self.internal_recv.recv().await {
#[cfg(feature = "async_std")]
Ok(packet) => Ok(packet),
#[cfg(feature = "async_std")]
Err(e) => Err(e),
#[cfg(feature = "async_tokio")]
Err(e) => Err(e)
}
}

#[cfg(feature = "async_tokio")]
pub async fn recv(&mut self) -> Result<Vec<u8>, RecvError> {
match self.internal_recv.recv().await {
Some(packet) => Ok(packet),
#[cfg(feature = "async_tokio")]
None => Err(RecvError::Closed),
}
}
Expand Down Expand Up @@ -452,25 +483,22 @@ impl Client {

let r = task::spawn(async move {
'task_loop: loop {
#[cfg(feature = "async_std")]
let net_dispatch = net_recv.lock().await;
let closed_dispatch = closed.lock().await;
select! {
killed = closed_dispatch.wait().fuse() => {
if killed {
rakrs_debug!(true, "[CLIENT] Recv task closed");
break;
}
}
pk_recv = net_dispatch.recv().fuse() => {
#[cfg(feature = "async_tokio")]
let mut net_dispatch = net_recv.lock().await;

let closed_dispatch = closed.lock().await;
macro_rules! recv_body {
($pk_recv: expr) => {
#[cfg(feature = "async_std")]
if let Err(_) = pk_recv {
if let Err(_) = $pk_recv {
rakrs_debug!(true, "[CLIENT] (recv_task) Failed to recieve anything on netowrk channel, is there a sender?");
continue;
}

#[cfg(feature = "async_tokio")]
if let None = pk_recv {
if let None = $pk_recv {
rakrs_debug!(true, "[CLIENT] (recv_task)Failed to recieve anything on netowrk channel, is there a sender?");
continue;
}
Expand All @@ -492,7 +520,7 @@ impl Client {
// drop here so the lock isn't held for too long
drop(client_state);

let mut buffer = pk_recv.unwrap();
let mut buffer = $pk_recv.unwrap();

match buffer[0] {
0x80..=0x8d => {
Expand Down Expand Up @@ -616,6 +644,32 @@ impl Client {
}
}
}
};
}

#[cfg(feature = "async_std")]
select! {
killed = closed_dispatch.wait().fuse() => {
if killed {
rakrs_debug!(true, "[CLIENT] Recv task closed");
break;
}
}
pk_recv = net_dispatch.recv().fuse() => {
recv_body!(pk_recv);
}
}

#[cfg(feature = "async_tokio")]
select! {
killed = closed_dispatch.wait() => {
if killed {
rakrs_debug!(true, "[CLIENT] Recv task closed");
break;
}
}
pk_recv = net_dispatch.recv() => {
recv_body!(pk_recv);
}
}
}
Expand Down Expand Up @@ -644,8 +698,9 @@ impl Client {
let t = task::spawn(async move {
loop {
let closer = closer_dispatch.lock().await;
select! {
_ = sleep(Duration::from_millis(50)).fuse() => {

macro_rules! tick_body {
() => {
let recv = last_recv.load(std::sync::atomic::Ordering::Relaxed);
let mut state = state.lock().await;

Expand Down Expand Up @@ -711,6 +766,13 @@ impl Client {
send_q.send_stream(&p).await;
}
}
};
}

#[cfg(feature = "async_std")]
select! {
_ = sleep(Duration::from_millis(50)).fuse() => {
tick_body!();
},
killed = closer.wait().fuse() => {
if killed {
Expand All @@ -719,6 +781,19 @@ impl Client {
}
}
}

#[cfg(feature = "async_tokio")]
select! {
_ = sleep(Duration::from_millis(50)) => {
tick_body!();
},
killed = closer.wait() => {
if killed {
rakrs_debug!(true, "[CLIENT] Connect tick task closed");
break;
}
}
}
}
});
Ok(t)
Expand Down
54 changes: 45 additions & 9 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio::{
mpsc::{channel as bounded, Receiver, Sender},
Mutex, RwLock,
},
select,
task::{self, JoinHandle},
time::sleep,
};
Expand Down Expand Up @@ -163,12 +164,8 @@ impl Connection {
// while handling throttle
return task::spawn(async move {
loop {
select! {
_ = closer.wait().fuse() => {
rakrs_debug!(true, "[{}] [TICK TASK] Connection has been closed due to closer!", to_address_token(address));
break;
}
_ = sleep(Duration::from_millis(50)).fuse() => {
macro_rules! tick_body {
() => {
let recv = last_recv.load(std::sync::atomic::Ordering::Relaxed);
let mut cstate = state.lock().await;

Expand Down Expand Up @@ -237,6 +234,28 @@ impl Connection {
sendq.send_stream(&p).await;
}
}
};
}

#[cfg(feature = "async_std")]
select! {
_ = closer.wait().fuse() => {
rakrs_debug!(true, "[{}] [TICK TASK] Connection has been closed due to closer!", to_address_token(address));
break;
}
_ = sleep(Duration::from_millis(50)).fuse() => {
tick_body!();
}
}

#[cfg(feature = "async_tokio")]
select! {
_ = closer.wait() => {
rakrs_debug!(true, "[{}] [TICK TASK] Connection has been closed due to closer!", to_address_token(address));
break;
}
_ = sleep(Duration::from_millis(50)) => {
tick_body!();
}
}
}
Expand All @@ -253,7 +272,12 @@ impl Connection {
///
pub async fn init_net_recv(
&self,
// THIS IS ONLY ACTIVATED ON STD
#[cfg(feature = "async_std")]
net: Receiver<Vec<u8>>,
// ONLY ACTIVATED ON TOKIO
#[cfg(feature = "async_tokio")]
mut net: Receiver<Vec<u8>>,
sender: Sender<Vec<u8>>,
) -> task::JoinHandle<()> {
let recv_time = self.recv_time.clone();
Expand Down Expand Up @@ -369,25 +393,37 @@ impl Connection {
};
}

#[cfg(feature = "async_std")]
select! {
_ = disconnect.wait().fuse() => {
rakrs_debug!(true, "[{}] [RECV TASK] Connection has been closed due to closer!", to_address_token(address));
break;
}
res = net.recv().fuse() => {
match res {
#[cfg(feature = "async_std")]
Ok(payload) => {
handle_payload!(payload);
}
#[cfg(feature = "async_tokio")]
_ => continue,
}
}
};

#[cfg(feature = "async_tokio")]
select! {
_ = disconnect.wait() => {
rakrs_debug!(true, "[{}] [RECV TASK] Connection has been closed due to closer!", to_address_token(address));
break;
}
res = net.recv() => {
match res {
Some(payload) => {
handle_payload!(payload);
}
_ => continue,
}
}
}
};
}
});
}
Expand Down
Loading

0 comments on commit d7c856a

Please sign in to comment.