Skip to content

Commit

Permalink
feat(notifiers): Fixes #44 only for async_std
Browse files Browse the repository at this point in the history
  • Loading branch information
john-bv committed Aug 9, 2023
1 parent 17d605f commit 021145e
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 324 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async_tokio = [ "tokio" ]

[dependencies]
rand = "0.8.3"
binary-utils = { git = "https://github.com/NetrexMC/BinaryUtil", tag = "v0.2.2", version = "0.2.2" }
binary_utils = { git = "https://github.com/NetrexMC/BinaryUtil", tag = "v0.2.2" }
tokio = { version = "1.28.2", features = ["full"], optional = true }
byteorder = "1.4.3"
futures = "0.3.19"
Expand Down
2 changes: 1 addition & 1 deletion examples/async-std/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{net::ToSocketAddrs, vec};
#[async_std::main]
async fn main() {
let mut client = Client::new(10, DEFAULT_MTU);
let mut addr = "ownagepe.com:19132".to_socket_addrs().unwrap();
let mut addr = "zeqa.net:19132".to_socket_addrs().unwrap();
if let Err(_) = client.connect(addr.next().unwrap()).await {
// here you could attempt to retry, but in this case, we'll just exit
println!("Failed to connect to server!");
Expand Down
2 changes: 1 addition & 1 deletion examples/async-std/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ edition = "2021"
[dependencies]
async-std = { version = "1.12.0", features = [ "attributes" ] }
console-subscriber = "0.1.8"
rak_rs = { path = "../../../", features = [ "debug", "debug_all", "mcpe" ] }
rak-rs = { path = "../../../", features = [ "debug", "debug_all", "mcpe", "async_tokio" ], default-features = false}
2 changes: 1 addition & 1 deletion examples/async-std/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() {
async fn handle(mut conn: Connection) {
loop {
// keeping the connection alive
if conn.is_closed() {
if conn.is_closed().await {
println!("Connection closed!");
break;
}
Expand Down
495 changes: 261 additions & 234 deletions src/client/mod.rs

Large diffs are not rendered by default.

185 changes: 102 additions & 83 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@ use std::{
time::Duration,
};

use binary_utils::Streamable;

#[cfg(feature = "async_std")]
use async_std::{
channel::{bounded, Receiver, RecvError, Sender},
net::UdpSocket,
sync::{Mutex, RwLock},
task::{self, sleep, JoinHandle},
};
use binary_utils::Streamable;
#[cfg(feature = "async_std")]
use futures::{
select,
FutureExt
};
#[cfg(feature = "async_tokio")]
use tokio::{
net::UdpSocket,
Expand Down Expand Up @@ -48,7 +54,7 @@ use crate::{
},
rakrs_debug,
server::current_epoch,
util::to_address_token,
util::to_address_token, notify::Notify,
};

use self::{
Expand Down Expand Up @@ -98,7 +104,7 @@ pub struct Connection {
internal_net_recv: ConnNetChan,
/// A notifier for when the connection should close.
/// This is used for absolute cleanup withtin the connection
disconnect: Arc<AtomicBool>,
disconnect: Arc<Notify>,
/// The event dispatcher for the connection.
// evt_sender: Sender<(ServerEvent, oneshot::Sender<ServerEventResponse>)>,
/// The event receiver for the connection.
Expand Down Expand Up @@ -134,7 +140,7 @@ impl Connection {
// evt_receiver,
state: Arc::new(Mutex::new(ConnectionState::Unidentified)),
// disconnect: Arc::new(Condvar::new()),
disconnect: Arc::new(AtomicBool::new(false)),
disconnect: Arc::new(Notify::new()),
recv_time: Arc::new(AtomicU64::new(current_epoch())),
tasks: Arc::new(Mutex::new(Vec::new())),
};
Expand Down Expand Up @@ -162,76 +168,89 @@ impl Connection {
// while handling throttle
return task::spawn(async move {
loop {
sleep(Duration::from_millis(50)).await;
let recv = last_recv.load(std::sync::atomic::Ordering::Relaxed);
let mut cstate = state.lock().await;

if *cstate == ConnectionState::Disconnected {
rakrs_debug!(
true,
"[{}] Connection has been closed due to state!",
to_address_token(address)
);
// closer.notify_all();
closer.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
select! {
_ = closer.wait().fuse() => {
rakrs_debug!(true, "[{}] [TICK TASK] Connection has been closed due to closer!", to_address_token(address));
break;
}
_ = sleep(Duration::from_millis(50)).fuse() => {
let recv = last_recv.load(std::sync::atomic::Ordering::Relaxed);
let mut cstate = state.lock().await;

if recv + 20000 <= current_epoch() {
*cstate = ConnectionState::Disconnected;
rakrs_debug!(
true,
"[{}] Connection has been closed due to inactivity!",
to_address_token(address)
);
// closer.notify_all();
closer.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
if *cstate == ConnectionState::Disconnected {
rakrs_debug!(
true,
"[{}] Connection has been closed due to state!",
to_address_token(address)
);
// closer.notify_all();
closer.notify().await;
break;
}

if recv + 15000 <= current_epoch() && cstate.is_reliable() {
*cstate = ConnectionState::TimingOut;
rakrs_debug!(
true,
"[{}] Connection is timing out, sending a ping!",
to_address_token(address)
);
}
if recv + 20000 <= current_epoch() {
*cstate = ConnectionState::Disconnected;
rakrs_debug!(
true,
"[{}] Connection has been closed due to inactivity!",
to_address_token(address)
);
// closer.notify_all();
closer.notify().await;
break;
}

let mut sendq = send_queue.write().await;
let mut recv_q = recv_queue.lock().await;
if recv + 15000 <= current_epoch() && cstate.is_reliable() {
*cstate = ConnectionState::TimingOut;
rakrs_debug!(
true,
"[{}] Connection is timing out, sending a ping!",
to_address_token(address)
);
}

if last_ping >= 3000 {
let ping = ConnectedPing {
time: current_epoch() as i64,
};
if let Ok(_) = sendq
.send_packet(ping.into(), Reliability::Reliable, true)
.await
{};
last_ping = 0;
} else {
last_ping += 50;
}
let mut sendq = send_queue.write().await;
let mut recv_q = recv_queue.lock().await;

if last_ping >= 3000 {
let ping = ConnectedPing {
time: current_epoch() as i64,
};
if let Ok(_) = sendq
.send_packet(ping.into(), Reliability::Reliable, true)
.await
{};
last_ping = 0;
} else {
last_ping += 50;
}

sendq.update().await;
sendq.update().await;

// Flush the queue of acks and nacks, and respond to them
let ack = Ack::from_records(recv_q.ack_flush(), false);
if ack.records.len() > 0 {
if let Ok(p) = ack.parse() {
sendq.send_stream(&p).await;
}
}
// Flush the queue of acks and nacks, and respond to them
let ack = Ack::from_records(recv_q.ack_flush(), false);
if ack.records.len() > 0 {
if let Ok(p) = ack.parse() {
sendq.send_stream(&p).await;
}
}

// flush nacks from recv queue
let nack = Ack::from_records(recv_q.nack_queue(), true);
if nack.records.len() > 0 {
if let Ok(p) = nack.parse() {
sendq.send_stream(&p).await;
// flush nacks from recv queue
let nack = Ack::from_records(recv_q.nack_queue(), true);
if nack.records.len() > 0 {
if let Ok(p) = nack.parse() {
sendq.send_stream(&p).await;
}
}
}
}
}

rakrs_debug!(
true,
"[{}] Connection has been closed due to end of tick!",
to_address_token(address)
);
});
}

Expand All @@ -251,14 +270,6 @@ impl Connection {

return task::spawn(async move {
loop {
if disconnect.load(std::sync::atomic::Ordering::Relaxed) {
rakrs_debug!(
true,
"[{}] Recv task has been closed!",
to_address_token(address)
);
break;
}
macro_rules! handle_payload {
($payload: ident) => {
// We've recieved a payload!
Expand Down Expand Up @@ -297,7 +308,7 @@ impl Connection {
// DISCONNECT
// disconnect.close();
rakrs_debug!(true, "[{}] Connection::process_packet returned true!", to_address_token(address));
disconnect.store(true, std::sync::atomic::Ordering::Relaxed);
disconnect.notify().await;
break;
}
}
Expand Down Expand Up @@ -363,16 +374,24 @@ impl Connection {
};
}

match net.recv().await {
#[cfg(feature = "async_std")]
Ok(payload) => {
handle_payload!(payload);
select! {
_ = disconnect.wait().fuse() => {
rakrs_debug!(true, "[{}] [RECV TASK] Connection has been closed due to closer!", to_address_token(address));
break;
}
#[cfg(feature = "async_tokio")]
Some(payload) => {
handle_payload!(payload);
res = net.recv().fuse() => {
match res {
#[cfg(feature = "async_std")]
Ok(payload) => {
handle_payload!(payload);
}
#[cfg(feature = "async_tokio")]
Some(payload) => {
handle_payload!(payload);
}
_ => continue,
}
}
_ => continue,
}
}
});
Expand Down Expand Up @@ -521,8 +540,8 @@ impl Connection {
// }
// }

pub fn is_closed(&self) -> bool {
self.disconnect.load(std::sync::atomic::Ordering::Acquire)
pub async fn is_closed(&self) -> bool {
!self.state.lock().await.is_available()
}

/// Send a packet to the client.
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,6 @@ pub mod util;

pub use protocol::mcpe::{self, motd::Motd};
pub use server::Listener;

/// An internal module for notifying the connection of state updates.
pub(crate) mod notify;
41 changes: 38 additions & 3 deletions src/notify/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,39 @@
/// Notifies any task to wake up
pub struct Notify {
#[cfg(feature = "async_std")]
use async_std::channel::{Receiver, Sender};

}
#[cfg(feature = "async_tokio")]
use tokio::sync::mpsc::{Receiver, Sender};

use crate::connection::queue::recv;

/// Notify is a struct that wraps a buffer channel
/// these channels are used to send messages to the main thread.
#[derive(Clone)]
pub struct Notify(pub Option<Sender<()>>, pub Receiver<()>);

impl Notify {
/// Creates a new Notify struct.
pub fn new() -> Self {
let (send, recv) = async_std::channel::bounded(1);
Self(Some(send), recv)
}

/// Sends a message to all listeners.
pub async fn notify(&self) -> bool {
if let Some(sender) = &self.0 {
sender.close();
true
} else {
false
}
}

/// Waits for a message from the main thread.
pub async fn wait(&self) -> bool {
if let Err(_) = self.1.recv().await {
false
} else {
true
}
}
}
3 changes: 3 additions & 0 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ pub const MAX_ORD_CHANS: u8 = 32;
pub const RAKNET_HEADER_FRAME_OVERHEAD: u16 = 20 + 8 + 8 + 4 + 20;
// IP Header + UDP Header + RakNet Header
pub const RAKNET_HEADER_OVERHEAD: u16 = 20 + 8 + 8;

pub const MTU_MAX: u16 = 2400;
pub const MTU_MIN: u16 = 400;

0 comments on commit 021145e

Please sign in to comment.