Skip to content

Commit

Permalink
Misc refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
BakerNet committed Aug 31, 2024
1 parent 4b7c2d9 commit 2e0276a
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 145 deletions.
3 changes: 1 addition & 2 deletions web/src/app/minesweeper/game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,12 @@ where

let (game_signal, _) = create_signal(game);

let game_id = game_info.game_id.clone();
create_effect(move |_| {
log::debug!("before ready_state");
let state = ready_state();
if state == ConnectionReadyState::Open {
log::debug!("ready_state Open");
game_signal().send(ClientMessage::Join(game_id.clone()));
game_signal().send(ClientMessage::Join);
} else if state == ConnectionReadyState::Closed {
log::debug!("ready_state Closed");
refetch();
Expand Down
1 change: 1 addition & 0 deletions web/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod auth;
mod fileserv;
mod game_manager;
mod users;
mod websocket;

pub use app::App;
pub use auth::{CSRF_STATE_KEY, NEXT_URL_KEY, OAUTH_TARGET};
Expand Down
6 changes: 3 additions & 3 deletions web/src/backend/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use tower_sessions_sqlx_store::SqliteStore;
use crate::{app::App as FrontendApp, app::OAuthTarget, models::game::Game};

use super::{
auth, auth::REDIRECT_URL, fileserv::file_and_error_handler, game_manager,
game_manager::GameManager, users, users::AuthSession,
auth, auth::REDIRECT_URL, fileserv::file_and_error_handler, game_manager::GameManager, users,
users::AuthSession, websocket,
};

/// This takes advantage of Axum's SubStates feature by deriving FromRef. This is the only way to have more than one
Expand Down Expand Up @@ -209,7 +209,7 @@ impl App {
.leptos_routes_with_handler(routes, get(leptos_routes_handler))
.fallback(file_and_error_handler)
.merge(auth::router())
.merge(game_manager::router())
.merge(websocket::router())
.layer(auth_service)
.with_state(app_state);
(app, addr)
Expand Down
142 changes: 3 additions & 139 deletions web/src/backend/game_manager.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
#![allow(dead_code)]
use ::chrono::{DateTime, Utc};
use anyhow::{anyhow, bail, Result};
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Path, State,
},
response::IntoResponse,
routing::get,
Router,
};
use futures::{sink::SinkExt, stream::SplitSink, StreamExt};
use http::StatusCode;
use axum::extract::ws::{Message, WebSocket};
use futures::{sink::SinkExt, stream::SplitSink};
use minesweeper_lib::{
cell::PlayerCell,
client::ClientPlayer,
Expand All @@ -33,12 +23,6 @@ use crate::{
},
};

use super::{app::AppState, users::AuthSession};

pub fn router() -> Router<AppState> {
Router::<AppState>::new().route("/api/websocket/game/:id", get(websocket_handler))
}

#[derive(Clone, Debug)]
struct PlayerHandle {
user_id: Option<i64>,
Expand Down Expand Up @@ -300,7 +284,7 @@ impl GameManager {
Ok(())
}

async fn was_playing(&self, game_id: &str, user: &Option<User>) -> bool {
pub async fn was_playing(&self, game_id: &str, user: &Option<User>) -> bool {
if user.is_none() {
return false;
}
Expand Down Expand Up @@ -563,123 +547,3 @@ impl GameHandler {
}
}
}

pub async fn websocket_handler(
ws: WebSocketUpgrade,
auth_session: AuthSession,
Path(game_id): Path<String>,
State(app_state): State<AppState>,
) -> impl IntoResponse {
if !app_state.game_manager.game_exists(&game_id).await
|| !app_state.game_manager.game_is_active(&game_id).await
{
return StatusCode::BAD_REQUEST.into_response();
}
ws.on_upgrade(|socket| websocket(socket, auth_session.user, game_id, app_state.game_manager))
}

// This function deals with a single websocket connection, i.e., a single
// connected client / user, for which we will spawn two independent tasks (for
// receiving / sending chat messages).
pub async fn websocket(
stream: WebSocket,
user: Option<User>,
game_id: String,
game_manager: GameManager,
) {
log::debug!("Websocket upgraded");
// By splitting, we can send and receive at the same time.
let (sender, mut receiver) = stream.split();
let sender = Arc::new(Mutex::new(sender));

let game_id = game_id.as_str();

let sender_clone = Arc::clone(&sender);
let mut rx = game_manager
.join_game(game_id, sender_clone)
.await
.unwrap_or_else(|_| panic!("Failed to join game ({}) from websocket", game_id));

let sender_clone = Arc::clone(&sender);
// Spawn the first task that will receive broadcast messages and send text
// messages over the websocket to our client.
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
// In any websocket error, break loop.
if sender_clone
.lock()
.await
.send(Message::Text(msg))
.await
.is_err()
{
break;
}
}
});

let mut game_sender = None;
if game_manager.was_playing(game_id, &user).await {
let resp = game_manager
.play_game(game_id, &user, Arc::clone(&sender))
.await;
match resp {
Ok(tx) => {
game_sender = Some(tx);
}
Err(e) => {
log::error!("Error playing game: {}", e)
}
}
} else {
loop {
tokio::select! {
_ = (&mut send_task) => break,
recvd = receiver.next() => {
match recvd {
Some(Ok(Message::Text(msg))) => {
let client_message = serde_json::from_str::<ClientMessage>(&msg);
match &client_message {
Ok(ClientMessage::PlayGame) => {
log::debug!("Trying to Play");
let resp = game_manager.play_game(game_id, &user, Arc::clone(&sender)).await;
match resp {
Ok(tx) => {
game_sender = Some(tx);
break;
},
Err(e) => {log::error!("Error playing game: {}", e)},
}
}
_ => log::debug!("Non PlayGame message: {:?}: {:?}", client_message, msg),
}
}
_ => break,
}
},
}
}
}

let game_sender = if let Some(game_sender) = game_sender {
game_sender
} else {
let _ = send_task.await;
return;
};

// Spawn a task that takes messages from the websocket and sends them to the game handler
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
if game_sender.send(text).await.is_err() {
return;
}
}
});

// If any one of the tasks run to completion, we abort the other.
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
};
}
141 changes: 141 additions & 0 deletions web/src/backend/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Path, State,
},
response::IntoResponse,
routing::get,
Router,
};
use futures::{sink::SinkExt, StreamExt};
use http::StatusCode;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::{messages::ClientMessage, models::user::User};

use super::{app::AppState, game_manager::GameManager, users::AuthSession};

pub fn router() -> Router<AppState> {
Router::<AppState>::new().route("/api/websocket/game/:id", get(websocket_handler))
}

pub async fn websocket_handler(
ws: WebSocketUpgrade,
auth_session: AuthSession,
Path(game_id): Path<String>,
State(app_state): State<AppState>,
) -> impl IntoResponse {
if !app_state.game_manager.game_exists(&game_id).await
|| !app_state.game_manager.game_is_active(&game_id).await
{
return StatusCode::BAD_REQUEST.into_response();
}
ws.on_upgrade(|socket| websocket(socket, auth_session.user, game_id, app_state.game_manager))
}

// This function deals with a single websocket connection, i.e., a single
// connected client / user, for which we will spawn two independent tasks (for
// receiving / sending chat messages).
pub async fn websocket(
stream: WebSocket,
user: Option<User>,
game_id: String,
game_manager: GameManager,
) {
log::debug!("Websocket upgraded");
// By splitting, we can send and receive at the same time.
let (sender, mut receiver) = stream.split();
let sender = Arc::new(Mutex::new(sender));

let game_id = game_id.as_str();

let sender_clone = Arc::clone(&sender);
let mut rx = game_manager
.join_game(game_id, sender_clone)
.await
.unwrap_or_else(|_| panic!("Failed to join game ({}) from websocket", game_id));

let sender_clone = Arc::clone(&sender);
// Spawn the first task that will receive broadcast messages and send text
// messages over the websocket to our client.
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
// In any websocket error, break loop.
if sender_clone
.lock()
.await
.send(Message::Text(msg))
.await
.is_err()
{
break;
}
}
});

let mut game_sender = None;
if game_manager.was_playing(game_id, &user).await {
let resp = game_manager
.play_game(game_id, &user, Arc::clone(&sender))
.await;
match resp {
Ok(tx) => {
game_sender = Some(tx);
}
Err(e) => {
log::error!("Error playing game: {}", e)
}
}
} else {
loop {
tokio::select! {
_ = (&mut send_task) => break,
recvd = receiver.next() => {
match recvd {
Some(Ok(Message::Text(msg))) => {
let client_message = serde_json::from_str::<ClientMessage>(&msg);
match &client_message {
Ok(ClientMessage::PlayGame) => {
log::debug!("Trying to Play");
let resp = game_manager.play_game(game_id, &user, Arc::clone(&sender)).await;
match resp {
Ok(tx) => {
game_sender = Some(tx);
break;
},
Err(e) => {log::error!("Error playing game: {}", e)},
}
}
_ => log::debug!("Non PlayGame message: {:?}: {:?}", client_message, msg),
}
}
_ => break,
}
},
}
}
}

let game_sender = if let Some(game_sender) = game_sender {
game_sender
} else {
let _ = send_task.await;
return;
};

// Spawn a task that takes messages from the websocket and sends them to the game handler
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
if game_sender.send(text).await.is_err() {
return;
}
}
});

// If any one of the tasks run to completion, we abort the other.
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
};
}
2 changes: 1 addition & 1 deletion web/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl FromStr for GameMessage {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "client_message", content = "data")]
pub enum ClientMessage {
Join(String),
Join,
PlayGame,
Play(Play),
}

0 comments on commit 2e0276a

Please sign in to comment.