From 2e0276ad74c9de9e5f389629f1b11dfd67726dac Mon Sep 17 00:00:00 2001 From: BakerNet Date: Fri, 30 Aug 2024 22:52:15 -0700 Subject: [PATCH] Misc refactor --- web/src/app/minesweeper/game.rs | 3 +- web/src/backend.rs | 1 + web/src/backend/app.rs | 6 +- web/src/backend/game_manager.rs | 142 +------------------------------- web/src/backend/websocket.rs | 141 +++++++++++++++++++++++++++++++ web/src/messages.rs | 2 +- 6 files changed, 150 insertions(+), 145 deletions(-) create mode 100644 web/src/backend/websocket.rs diff --git a/web/src/app/minesweeper/game.rs b/web/src/app/minesweeper/game.rs index 917c0a6..8597817 100644 --- a/web/src/app/minesweeper/game.rs +++ b/web/src/app/minesweeper/game.rs @@ -313,13 +313,12 @@ where let (game_signal, _) = create_signal(game); - let game_id = game_info.game_id.clone(); create_effect(move |_| { log::debug!("before ready_state"); let state = ready_state(); if state == ConnectionReadyState::Open { log::debug!("ready_state Open"); - game_signal().send(ClientMessage::Join(game_id.clone())); + game_signal().send(ClientMessage::Join); } else if state == ConnectionReadyState::Closed { log::debug!("ready_state Closed"); refetch(); diff --git a/web/src/backend.rs b/web/src/backend.rs index 0c03881..2253c36 100644 --- a/web/src/backend.rs +++ b/web/src/backend.rs @@ -3,6 +3,7 @@ mod auth; mod fileserv; mod game_manager; mod users; +mod websocket; pub use app::App; pub use auth::{CSRF_STATE_KEY, NEXT_URL_KEY, OAUTH_TARGET}; diff --git a/web/src/backend/app.rs b/web/src/backend/app.rs index 8938273..257b4bd 100644 --- a/web/src/backend/app.rs +++ b/web/src/backend/app.rs @@ -23,8 +23,8 @@ use tower_sessions_sqlx_store::SqliteStore; use crate::{app::App as FrontendApp, app::OAuthTarget, models::game::Game}; use super::{ - auth, auth::REDIRECT_URL, fileserv::file_and_error_handler, game_manager, - game_manager::GameManager, users, users::AuthSession, + auth, auth::REDIRECT_URL, fileserv::file_and_error_handler, game_manager::GameManager, users, + users::AuthSession, websocket, }; /// This takes advantage of Axum's SubStates feature by deriving FromRef. This is the only way to have more than one @@ -209,7 +209,7 @@ impl App { .leptos_routes_with_handler(routes, get(leptos_routes_handler)) .fallback(file_and_error_handler) .merge(auth::router()) - .merge(game_manager::router()) + .merge(websocket::router()) .layer(auth_service) .with_state(app_state); (app, addr) diff --git a/web/src/backend/game_manager.rs b/web/src/backend/game_manager.rs index 2a760cc..ce6e4b9 100644 --- a/web/src/backend/game_manager.rs +++ b/web/src/backend/game_manager.rs @@ -1,17 +1,7 @@ -#![allow(dead_code)] use ::chrono::{DateTime, Utc}; use anyhow::{anyhow, bail, Result}; -use axum::{ - extract::{ - ws::{Message, WebSocket, WebSocketUpgrade}, - Path, State, - }, - response::IntoResponse, - routing::get, - Router, -}; -use futures::{sink::SinkExt, stream::SplitSink, StreamExt}; -use http::StatusCode; +use axum::extract::ws::{Message, WebSocket}; +use futures::{sink::SinkExt, stream::SplitSink}; use minesweeper_lib::{ cell::PlayerCell, client::ClientPlayer, @@ -33,12 +23,6 @@ use crate::{ }, }; -use super::{app::AppState, users::AuthSession}; - -pub fn router() -> Router { - Router::::new().route("/api/websocket/game/:id", get(websocket_handler)) -} - #[derive(Clone, Debug)] struct PlayerHandle { user_id: Option, @@ -300,7 +284,7 @@ impl GameManager { Ok(()) } - async fn was_playing(&self, game_id: &str, user: &Option) -> bool { + pub async fn was_playing(&self, game_id: &str, user: &Option) -> bool { if user.is_none() { return false; } @@ -563,123 +547,3 @@ impl GameHandler { } } } - -pub async fn websocket_handler( - ws: WebSocketUpgrade, - auth_session: AuthSession, - Path(game_id): Path, - State(app_state): State, -) -> impl IntoResponse { - if !app_state.game_manager.game_exists(&game_id).await - || !app_state.game_manager.game_is_active(&game_id).await - { - return StatusCode::BAD_REQUEST.into_response(); - } - ws.on_upgrade(|socket| websocket(socket, auth_session.user, game_id, app_state.game_manager)) -} - -// This function deals with a single websocket connection, i.e., a single -// connected client / user, for which we will spawn two independent tasks (for -// receiving / sending chat messages). -pub async fn websocket( - stream: WebSocket, - user: Option, - game_id: String, - game_manager: GameManager, -) { - log::debug!("Websocket upgraded"); - // By splitting, we can send and receive at the same time. - let (sender, mut receiver) = stream.split(); - let sender = Arc::new(Mutex::new(sender)); - - let game_id = game_id.as_str(); - - let sender_clone = Arc::clone(&sender); - let mut rx = game_manager - .join_game(game_id, sender_clone) - .await - .unwrap_or_else(|_| panic!("Failed to join game ({}) from websocket", game_id)); - - let sender_clone = Arc::clone(&sender); - // Spawn the first task that will receive broadcast messages and send text - // messages over the websocket to our client. - let mut send_task = tokio::spawn(async move { - while let Ok(msg) = rx.recv().await { - // In any websocket error, break loop. - if sender_clone - .lock() - .await - .send(Message::Text(msg)) - .await - .is_err() - { - break; - } - } - }); - - let mut game_sender = None; - if game_manager.was_playing(game_id, &user).await { - let resp = game_manager - .play_game(game_id, &user, Arc::clone(&sender)) - .await; - match resp { - Ok(tx) => { - game_sender = Some(tx); - } - Err(e) => { - log::error!("Error playing game: {}", e) - } - } - } else { - loop { - tokio::select! { - _ = (&mut send_task) => break, - recvd = receiver.next() => { - match recvd { - Some(Ok(Message::Text(msg))) => { - let client_message = serde_json::from_str::(&msg); - match &client_message { - Ok(ClientMessage::PlayGame) => { - log::debug!("Trying to Play"); - let resp = game_manager.play_game(game_id, &user, Arc::clone(&sender)).await; - match resp { - Ok(tx) => { - game_sender = Some(tx); - break; - }, - Err(e) => {log::error!("Error playing game: {}", e)}, - } - } - _ => log::debug!("Non PlayGame message: {:?}: {:?}", client_message, msg), - } - } - _ => break, - } - }, - } - } - } - - let game_sender = if let Some(game_sender) = game_sender { - game_sender - } else { - let _ = send_task.await; - return; - }; - - // Spawn a task that takes messages from the websocket and sends them to the game handler - let mut recv_task = tokio::spawn(async move { - while let Some(Ok(Message::Text(text))) = receiver.next().await { - if game_sender.send(text).await.is_err() { - return; - } - } - }); - - // If any one of the tasks run to completion, we abort the other. - tokio::select! { - _ = (&mut send_task) => recv_task.abort(), - _ = (&mut recv_task) => send_task.abort(), - }; -} diff --git a/web/src/backend/websocket.rs b/web/src/backend/websocket.rs new file mode 100644 index 0000000..06ba6b5 --- /dev/null +++ b/web/src/backend/websocket.rs @@ -0,0 +1,141 @@ +use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + Path, State, + }, + response::IntoResponse, + routing::get, + Router, +}; +use futures::{sink::SinkExt, StreamExt}; +use http::StatusCode; +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::{messages::ClientMessage, models::user::User}; + +use super::{app::AppState, game_manager::GameManager, users::AuthSession}; + +pub fn router() -> Router { + Router::::new().route("/api/websocket/game/:id", get(websocket_handler)) +} + +pub async fn websocket_handler( + ws: WebSocketUpgrade, + auth_session: AuthSession, + Path(game_id): Path, + State(app_state): State, +) -> impl IntoResponse { + if !app_state.game_manager.game_exists(&game_id).await + || !app_state.game_manager.game_is_active(&game_id).await + { + return StatusCode::BAD_REQUEST.into_response(); + } + ws.on_upgrade(|socket| websocket(socket, auth_session.user, game_id, app_state.game_manager)) +} + +// This function deals with a single websocket connection, i.e., a single +// connected client / user, for which we will spawn two independent tasks (for +// receiving / sending chat messages). +pub async fn websocket( + stream: WebSocket, + user: Option, + game_id: String, + game_manager: GameManager, +) { + log::debug!("Websocket upgraded"); + // By splitting, we can send and receive at the same time. + let (sender, mut receiver) = stream.split(); + let sender = Arc::new(Mutex::new(sender)); + + let game_id = game_id.as_str(); + + let sender_clone = Arc::clone(&sender); + let mut rx = game_manager + .join_game(game_id, sender_clone) + .await + .unwrap_or_else(|_| panic!("Failed to join game ({}) from websocket", game_id)); + + let sender_clone = Arc::clone(&sender); + // Spawn the first task that will receive broadcast messages and send text + // messages over the websocket to our client. + let mut send_task = tokio::spawn(async move { + while let Ok(msg) = rx.recv().await { + // In any websocket error, break loop. + if sender_clone + .lock() + .await + .send(Message::Text(msg)) + .await + .is_err() + { + break; + } + } + }); + + let mut game_sender = None; + if game_manager.was_playing(game_id, &user).await { + let resp = game_manager + .play_game(game_id, &user, Arc::clone(&sender)) + .await; + match resp { + Ok(tx) => { + game_sender = Some(tx); + } + Err(e) => { + log::error!("Error playing game: {}", e) + } + } + } else { + loop { + tokio::select! { + _ = (&mut send_task) => break, + recvd = receiver.next() => { + match recvd { + Some(Ok(Message::Text(msg))) => { + let client_message = serde_json::from_str::(&msg); + match &client_message { + Ok(ClientMessage::PlayGame) => { + log::debug!("Trying to Play"); + let resp = game_manager.play_game(game_id, &user, Arc::clone(&sender)).await; + match resp { + Ok(tx) => { + game_sender = Some(tx); + break; + }, + Err(e) => {log::error!("Error playing game: {}", e)}, + } + } + _ => log::debug!("Non PlayGame message: {:?}: {:?}", client_message, msg), + } + } + _ => break, + } + }, + } + } + } + + let game_sender = if let Some(game_sender) = game_sender { + game_sender + } else { + let _ = send_task.await; + return; + }; + + // Spawn a task that takes messages from the websocket and sends them to the game handler + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(Message::Text(text))) = receiver.next().await { + if game_sender.send(text).await.is_err() { + return; + } + } + }); + + // If any one of the tasks run to completion, we abort the other. + tokio::select! { + _ = (&mut send_task) => recv_task.abort(), + _ = (&mut recv_task) => send_task.abort(), + }; +} diff --git a/web/src/messages.rs b/web/src/messages.rs index 555fdd2..ae296ab 100644 --- a/web/src/messages.rs +++ b/web/src/messages.rs @@ -41,7 +41,7 @@ impl FromStr for GameMessage { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "client_message", content = "data")] pub enum ClientMessage { - Join(String), + Join, PlayGame, Play(Play), }