From 496596ada61318c78d7dd1a2e149d04d80b9cc4a Mon Sep 17 00:00:00 2001 From: john-bv Date: Sat, 23 Sep 2023 13:38:19 -0500 Subject: [PATCH] chore: Attempt to fix #53 --- examples/async-std/client/src/main.rs | 6 +-- examples/async-std/server/Cargo.toml | 4 +- src/client/handshake.rs | 61 +++++++++++++++++----- src/client/mod.rs | 75 +++++++++++++++++---------- src/connection/mod.rs | 4 +- src/protocol/packet/online.rs | 2 +- src/server/mod.rs | 12 +++-- 7 files changed, 112 insertions(+), 52 deletions(-) diff --git a/examples/async-std/client/src/main.rs b/examples/async-std/client/src/main.rs index 93d3df4..b9f5cd4 100644 --- a/examples/async-std/client/src/main.rs +++ b/examples/async-std/client/src/main.rs @@ -4,10 +4,10 @@ use std::{net::ToSocketAddrs, vec}; #[async_std::main] async fn main() { let mut client = Client::new(10, DEFAULT_MTU); - let mut addr = "na.zeqa.net:19132".to_socket_addrs().unwrap(); - if let Err(_) = client.connect(addr.next().unwrap()).await { + let mut addr = "zeqa.net:19132".to_socket_addrs().unwrap(); + if let Err(e) = client.connect(addr.next().unwrap()).await { // here you could attempt to retry, but in this case, we'll just exit - println!("Failed to connect to server!"); + println!("Failed to connect to server: {:?}", e); return; } diff --git a/examples/async-std/server/Cargo.toml b/examples/async-std/server/Cargo.toml index ee37f2a..94f25e1 100644 --- a/examples/async-std/server/Cargo.toml +++ b/examples/async-std/server/Cargo.toml @@ -8,5 +8,5 @@ edition = "2021" [dependencies] async-std = { version = "1.12.0", features = [ "attributes" ] } console-subscriber = "0.1.8" -# rak-rs = { path = "../../../", features = [ "debug", "debug_all", "mcpe" ] } -rak-rs = { path = "../../../", features = [ "mcpe" ] } +rak-rs = { path = "../../../", features = [ "debug", "debug_all", "mcpe" ] } +# rak-rs = { path = "../../../", features = [ "mcpe" ] } diff --git a/src/client/handshake.rs b/src/client/handshake.rs index aab3f92..7492ef8 100644 --- a/src/client/handshake.rs +++ b/src/client/handshake.rs @@ -156,7 +156,7 @@ pub enum HandshakeStatus { Completed, } -struct HandshakeState { +pub(crate) struct HandshakeState { status: HandshakeStatus, done: bool, waker: Option, @@ -233,27 +233,36 @@ impl ClientHandshake { ); let mut recv_q = RecvQueue::new(); - let connect_request = ConnectionRequest { - time: current_epoch() as i64, - client_id: id, - security: false, - }; - - if let Err(_) = send_q - .send_packet(connect_request.into(), Reliability::Reliable, true) - .await - { + if let Err(_) = Self::send_connection_request(&mut send_q, id).await { update_state!(true, shared_state, HandshakeStatus::Failed); } rakrs_debug!(true, "[CLIENT] Sent ConnectionRequest to server!"); + let mut send_time = current_epoch() as i64; + let mut tries = 0_u8; + let mut buf: [u8; 2048] = [0; 2048]; loop { let len: usize; let rec = socket.recv_from(&mut buf).await; + if (send_time + 2) <= current_epoch() as i64 { + send_time = current_epoch() as i64; + + rakrs_debug!(true, "[CLIENT] Server did not reply with ConnectAccept, sending another..."); + + if let Err(_) = Self::send_connection_request(&mut send_q, id).await { + update_state!(true, shared_state, HandshakeStatus::Failed); + } + + tries += 1; + if tries >= 5 { + update_state!(true, shared_state, HandshakeStatus::Failed); + } + } + match rec { Err(_) => { continue; @@ -267,7 +276,9 @@ impl ClientHandshake { match buf[0] { 0x80..=0x8d => { if let Ok(pk) = FramePacket::read(&mut reader) { - recv_q.insert(pk).unwrap(); + if let Err(_) = recv_q.insert(pk) { + continue; + } let raw_packets = recv_q.flush(); @@ -342,7 +353,12 @@ impl ClientHandshake { ); } } - _ => {} + _ => { + rakrs_debug!( + true, + "[CLIENT] Received unknown packet from server!" + ); + } } } } @@ -355,6 +371,25 @@ impl ClientHandshake { Self { status: state } } + + pub(crate) async fn send_connection_request(send_q: &mut SendQueue, id: i64) -> std::io::Result<()> { + let connect_request = ConnectionRequest { + time: current_epoch() as i64, + client_id: id, + security: false, + }; + + if let Err(_) = send_q + .send_packet(connect_request.into(), Reliability::Reliable, true) + .await + { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to send ConnectionRequest!", + )); + } + return Ok(()); + } } impl Future for ClientHandshake { diff --git a/src/client/mod.rs b/src/client/mod.rs index b96e7da..5343b1c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -369,16 +369,38 @@ impl Client { } }); - let recv_task = self.init_recv_task().await?; - let tick_task = self.init_connect_tick(send_queue.clone()).await?; + let recv_task = self.init_recv_task(); + let tisk_task = self.init_connect_tick(send_queue.clone()); + + if let Err(e) = recv_task { + rakrs_debug!(true, "[CLIENT] Failed to start recv task: {:?}", e); + return Err(ClientError::Killed); + } + + if let Err(e) = tisk_task { + rakrs_debug!(true, "[CLIENT] Failed to start connect tick task: {:?}", e); + return Err(ClientError::Killed); + } + + let recv_task = recv_task.unwrap(); + let tisk_task = tisk_task.unwrap(); + + if *self.state.lock().await != ConnectionState::Identified { + return Err(ClientError::AlreadyOnline); + } + + self.update_state(ConnectionState::Connected).await; + + let mut tasks = self.tasks.lock().await; // Responsible for the raw socket - self.push_task(socket_task).await; + tasks.push(socket_task); // Responsible for digesting messages from the network - self.push_task(recv_task).await; + tasks.push(recv_task); // Responsible for sending packets to the server and keeping the connection alive - self.push_task(tick_task).await; + tasks.push(tisk_task); + rakrs_debug!("[CLIENT] Client is now connected!"); Ok(()) } @@ -571,11 +593,7 @@ impl Client { } } - async fn push_task(&self, task: JoinHandle<()>) { - self.tasks.lock().await.push(task); - } - - async fn init_recv_task(&self) -> Result, ClientError> { + fn init_recv_task(&self) -> Result, ClientError> { let net_recv = match self.network_recv { Some(ref n) => n.clone(), None => { @@ -598,7 +616,7 @@ impl Client { let state = self.state.clone(); let recv_time = self.recv_time.clone(); - let r = task::spawn(async move { + return Ok(task::spawn(async move { 'task_loop: loop { #[cfg(feature = "async_std")] let net_dispatch = net_recv.lock().await; @@ -795,34 +813,29 @@ impl Client { } } } - }); - Ok(r) + })); } /// This is an internal function that initializes the client connection. /// This is called by `Client::connect()`. - async fn init_connect_tick( + fn init_connect_tick( &self, send_queue: Arc>, ) -> Result, ClientError> { // verify that the client is offline - if *self.state.lock().await != ConnectionState::Identified { - return Err(ClientError::AlreadyOnline); - } - self.update_state(ConnectionState::Connected).await; - let closer_dispatch = self.close_notifier.clone(); let recv_queue = self.recv_queue.clone(); let state = self.state.clone(); let last_recv = self.recv_time.clone(); let mut last_ping: u16 = 0; - let t = task::spawn(async move { + return Ok(task::spawn(async move { loop { let closer = closer_dispatch.lock().await; macro_rules! tick_body { () => { + rakrs_debug!(true, "[CLIENT] Running connect tick task"); let recv = last_recv.load(std::sync::atomic::Ordering::Relaxed); let mut state = state.lock().await; @@ -843,25 +856,32 @@ impl Client { continue; } - if (recv + 20000) <= current_epoch() { + if (recv + 20) <= current_epoch() { *state = ConnectionState::Disconnected; rakrs_debug!(true, "[CLIENT] Client timed out. Closing connection..."); closer.notify().await; break; } - if recv + 15000 <= current_epoch() && state.is_reliable() { + let mut send_q = send_queue.write().await; + let mut recv_q = recv_queue.lock().await; + + if recv + 10 <= current_epoch() && state.is_reliable() { *state = ConnectionState::TimingOut; rakrs_debug!( true, "[CLIENT] Connection is timing out, sending a ping!", ); + let ping = ConnectedPing { + time: current_epoch() as i64, + }; + if let Ok(_) = send_q + .send_packet(ping.into(), Reliability::Reliable, true) + .await + {} } - let mut send_q = send_queue.write().await; - let mut recv_q = recv_queue.lock().await; - - if last_ping >= 3000 { + if last_ping >= 500 { let ping = ConnectedPing { time: current_epoch() as i64, }; @@ -920,8 +940,7 @@ impl Client { } } } - }); - Ok(t) + })); } } diff --git a/src/connection/mod.rs b/src/connection/mod.rs index 00c81b2..918ac19 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -258,7 +258,7 @@ impl Connection { break; } - if recv + 20000 <= current_epoch() { + if recv + 15 <= current_epoch() { *cstate = ConnectionState::Disconnected; rakrs_debug!( true, @@ -270,7 +270,7 @@ impl Connection { break; } - if recv + 15000 <= current_epoch() && cstate.is_reliable() { + if recv + 10 <= current_epoch() && cstate.is_reliable() { *cstate = ConnectionState::TimingOut; rakrs_debug!( true, diff --git a/src/protocol/packet/online.rs b/src/protocol/packet/online.rs index f41c291..9b4906c 100644 --- a/src/protocol/packet/online.rs +++ b/src/protocol/packet/online.rs @@ -111,7 +111,7 @@ impl Reader for ConnectionAccept { for _ in 0..20 { // we only have the request time and timestamp left... - if buf.as_slice().len() < 16 { + if buf.as_slice().len() <= 16 { break; } internal_ids.push(buf.read_type::()?); diff --git a/src/server/mod.rs b/src/server/mod.rs index d443cb3..be26ff4 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -344,7 +344,6 @@ impl Listener { let mut buf: [u8; 2048] = [0; 2048]; #[cfg(feature = "mcpe")] let motd_default = default_motd.clone(); - loop { let length: usize; let origin: SocketAddr; @@ -357,8 +356,15 @@ impl Listener { origin = o; } Err(e) => { - rakrs_debug!(true, "Error: {:?}", e); - continue; + match e.kind() { + std::io::ErrorKind::ConnectionReset => { + continue; + }, + _ => { + rakrs_debug!(true, "[SERVER-SOCKET] Failed to recieve packet! {}", e); + continue; + } + } } }