Skip to content

Commit

Permalink
chore: Attempt to fix #53
Browse files Browse the repository at this point in the history
  • Loading branch information
john-bv committed Sep 23, 2023
1 parent 6d600cd commit 496596a
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 52 deletions.
6 changes: 3 additions & 3 deletions examples/async-std/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use std::{net::ToSocketAddrs, vec};
#[async_std::main]
async fn main() {
let mut client = Client::new(10, DEFAULT_MTU);
let mut addr = "na.zeqa.net:19132".to_socket_addrs().unwrap();
if let Err(_) = client.connect(addr.next().unwrap()).await {
let mut addr = "zeqa.net:19132".to_socket_addrs().unwrap();
if let Err(e) = client.connect(addr.next().unwrap()).await {
// here you could attempt to retry, but in this case, we'll just exit
println!("Failed to connect to server!");
println!("Failed to connect to server: {:?}", e);
return;
}

Expand Down
4 changes: 2 additions & 2 deletions examples/async-std/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ edition = "2021"
[dependencies]
async-std = { version = "1.12.0", features = [ "attributes" ] }
console-subscriber = "0.1.8"
# rak-rs = { path = "../../../", features = [ "debug", "debug_all", "mcpe" ] }
rak-rs = { path = "../../../", features = [ "mcpe" ] }
rak-rs = { path = "../../../", features = [ "debug", "debug_all", "mcpe" ] }
# rak-rs = { path = "../../../", features = [ "mcpe" ] }
61 changes: 48 additions & 13 deletions src/client/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ pub enum HandshakeStatus {
Completed,
}

struct HandshakeState {
pub(crate) struct HandshakeState {
status: HandshakeStatus,
done: bool,
waker: Option<Waker>,
Expand Down Expand Up @@ -233,27 +233,36 @@ impl ClientHandshake {
);
let mut recv_q = RecvQueue::new();

let connect_request = ConnectionRequest {
time: current_epoch() as i64,
client_id: id,
security: false,
};

if let Err(_) = send_q
.send_packet(connect_request.into(), Reliability::Reliable, true)
.await
{
if let Err(_) = Self::send_connection_request(&mut send_q, id).await {
update_state!(true, shared_state, HandshakeStatus::Failed);
}

rakrs_debug!(true, "[CLIENT] Sent ConnectionRequest to server!");

let mut send_time = current_epoch() as i64;
let mut tries = 0_u8;

let mut buf: [u8; 2048] = [0; 2048];

loop {
let len: usize;
let rec = socket.recv_from(&mut buf).await;

if (send_time + 2) <= current_epoch() as i64 {
send_time = current_epoch() as i64;

rakrs_debug!(true, "[CLIENT] Server did not reply with ConnectAccept, sending another...");

if let Err(_) = Self::send_connection_request(&mut send_q, id).await {
update_state!(true, shared_state, HandshakeStatus::Failed);
}

tries += 1;
if tries >= 5 {
update_state!(true, shared_state, HandshakeStatus::Failed);
}
}

match rec {
Err(_) => {
continue;
Expand All @@ -267,7 +276,9 @@ impl ClientHandshake {
match buf[0] {
0x80..=0x8d => {
if let Ok(pk) = FramePacket::read(&mut reader) {
recv_q.insert(pk).unwrap();
if let Err(_) = recv_q.insert(pk) {
continue;
}

let raw_packets = recv_q.flush();

Expand Down Expand Up @@ -342,7 +353,12 @@ impl ClientHandshake {
);
}
}
_ => {}
_ => {
rakrs_debug!(
true,
"[CLIENT] Received unknown packet from server!"
);
}
}
}
}
Expand All @@ -355,6 +371,25 @@ impl ClientHandshake {

Self { status: state }
}

pub(crate) async fn send_connection_request(send_q: &mut SendQueue, id: i64) -> std::io::Result<()> {
let connect_request = ConnectionRequest {
time: current_epoch() as i64,
client_id: id,
security: false,
};

if let Err(_) = send_q
.send_packet(connect_request.into(), Reliability::Reliable, true)
.await
{
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to send ConnectionRequest!",
));
}
return Ok(());
}
}

impl Future for ClientHandshake {
Expand Down
75 changes: 47 additions & 28 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,16 +369,38 @@ impl Client {
}
});

let recv_task = self.init_recv_task().await?;
let tick_task = self.init_connect_tick(send_queue.clone()).await?;
let recv_task = self.init_recv_task();
let tisk_task = self.init_connect_tick(send_queue.clone());

if let Err(e) = recv_task {
rakrs_debug!(true, "[CLIENT] Failed to start recv task: {:?}", e);
return Err(ClientError::Killed);
}

if let Err(e) = tisk_task {
rakrs_debug!(true, "[CLIENT] Failed to start connect tick task: {:?}", e);
return Err(ClientError::Killed);
}

let recv_task = recv_task.unwrap();
let tisk_task = tisk_task.unwrap();

if *self.state.lock().await != ConnectionState::Identified {
return Err(ClientError::AlreadyOnline);
}

self.update_state(ConnectionState::Connected).await;

let mut tasks = self.tasks.lock().await;

// Responsible for the raw socket
self.push_task(socket_task).await;
tasks.push(socket_task);
// Responsible for digesting messages from the network
self.push_task(recv_task).await;
tasks.push(recv_task);
// Responsible for sending packets to the server and keeping the connection alive
self.push_task(tick_task).await;
tasks.push(tisk_task);

rakrs_debug!("[CLIENT] Client is now connected!");
Ok(())
}

Expand Down Expand Up @@ -571,11 +593,7 @@ impl Client {
}
}

async fn push_task(&self, task: JoinHandle<()>) {
self.tasks.lock().await.push(task);
}

async fn init_recv_task(&self) -> Result<JoinHandle<()>, ClientError> {
fn init_recv_task(&self) -> Result<JoinHandle<()>, ClientError> {
let net_recv = match self.network_recv {
Some(ref n) => n.clone(),
None => {
Expand All @@ -598,7 +616,7 @@ impl Client {
let state = self.state.clone();
let recv_time = self.recv_time.clone();

let r = task::spawn(async move {
return Ok(task::spawn(async move {
'task_loop: loop {
#[cfg(feature = "async_std")]
let net_dispatch = net_recv.lock().await;
Expand Down Expand Up @@ -795,34 +813,29 @@ impl Client {
}
}
}
});
Ok(r)
}));
}

/// This is an internal function that initializes the client connection.
/// This is called by `Client::connect()`.
async fn init_connect_tick(
fn init_connect_tick(
&self,
send_queue: Arc<RwLock<SendQueue>>,
) -> Result<task::JoinHandle<()>, ClientError> {
// verify that the client is offline
if *self.state.lock().await != ConnectionState::Identified {
return Err(ClientError::AlreadyOnline);
}
self.update_state(ConnectionState::Connected).await;

let closer_dispatch = self.close_notifier.clone();
let recv_queue = self.recv_queue.clone();
let state = self.state.clone();
let last_recv = self.recv_time.clone();
let mut last_ping: u16 = 0;

let t = task::spawn(async move {
return Ok(task::spawn(async move {
loop {
let closer = closer_dispatch.lock().await;

macro_rules! tick_body {
() => {
rakrs_debug!(true, "[CLIENT] Running connect tick task");
let recv = last_recv.load(std::sync::atomic::Ordering::Relaxed);
let mut state = state.lock().await;

Expand All @@ -843,25 +856,32 @@ impl Client {
continue;
}

if (recv + 20000) <= current_epoch() {
if (recv + 20) <= current_epoch() {
*state = ConnectionState::Disconnected;
rakrs_debug!(true, "[CLIENT] Client timed out. Closing connection...");
closer.notify().await;
break;
}

if recv + 15000 <= current_epoch() && state.is_reliable() {
let mut send_q = send_queue.write().await;
let mut recv_q = recv_queue.lock().await;

if recv + 10 <= current_epoch() && state.is_reliable() {
*state = ConnectionState::TimingOut;
rakrs_debug!(
true,
"[CLIENT] Connection is timing out, sending a ping!",
);
let ping = ConnectedPing {
time: current_epoch() as i64,
};
if let Ok(_) = send_q
.send_packet(ping.into(), Reliability::Reliable, true)
.await
{}
}

let mut send_q = send_queue.write().await;
let mut recv_q = recv_queue.lock().await;

if last_ping >= 3000 {
if last_ping >= 500 {
let ping = ConnectedPing {
time: current_epoch() as i64,
};
Expand Down Expand Up @@ -920,8 +940,7 @@ impl Client {
}
}
}
});
Ok(t)
}));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl Connection {
break;
}

if recv + 20000 <= current_epoch() {
if recv + 15 <= current_epoch() {
*cstate = ConnectionState::Disconnected;
rakrs_debug!(
true,
Expand All @@ -270,7 +270,7 @@ impl Connection {
break;
}

if recv + 15000 <= current_epoch() && cstate.is_reliable() {
if recv + 10 <= current_epoch() && cstate.is_reliable() {
*cstate = ConnectionState::TimingOut;
rakrs_debug!(
true,
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/packet/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Reader<ConnectionAccept> for ConnectionAccept {

for _ in 0..20 {
// we only have the request time and timestamp left...
if buf.as_slice().len() < 16 {
if buf.as_slice().len() <= 16 {
break;
}
internal_ids.push(buf.read_type::<SocketAddr>()?);
Expand Down
12 changes: 9 additions & 3 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ impl Listener {
let mut buf: [u8; 2048] = [0; 2048];
#[cfg(feature = "mcpe")]
let motd_default = default_motd.clone();

loop {
let length: usize;
let origin: SocketAddr;
Expand All @@ -357,8 +356,15 @@ impl Listener {
origin = o;
}
Err(e) => {
rakrs_debug!(true, "Error: {:?}", e);
continue;
match e.kind() {
std::io::ErrorKind::ConnectionReset => {
continue;
},
_ => {
rakrs_debug!(true, "[SERVER-SOCKET] Failed to recieve packet! {}", e);
continue;
}
}
}
}

Expand Down

0 comments on commit 496596a

Please sign in to comment.