diff --git a/server/src/bridge.rs b/server/src/bridge.rs index bf5dea2..7eba6c3 100644 --- a/server/src/bridge.rs +++ b/server/src/bridge.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use tokio_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame}; use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; -use crate::{error::Error, protocol::*, room::Peer, server::Server, util::*}; +use crate::{error::Error, protocol::*, server::Server, server::User, util::*}; use futures_util::{future::Either, stream}; use tokio_tungstenite::connect_async; @@ -21,7 +21,7 @@ pub struct Bridge { pub key: String, pub map: String, pub server_tx: Tx, - pub peers_tx: HashMap, + pub users_tx: HashMap, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -51,7 +51,7 @@ impl Server { .find(|(_, v)| v.key == key) .ok_or(Error::BridgeNotFound)?; - bridge.peers_tx.insert(addr, ws_send); + bridge.users_tx.insert(addr, ws_send); bridge_addr.clone() }; @@ -93,7 +93,7 @@ impl Server { Either::Right((res, _)) => res, }; - // remove the peer, send logout + // remove the user, send logout || -> Option<()> { let mut bridges = self.remote_bridges.write().unwrap(); let bridge = bridges.get_mut(&bridge_addr)?; @@ -110,7 +110,7 @@ impl Server { .unbounded_send(WebSocketMessage::Text(logout_msg)) .ok(); - bridge.peers_tx.remove(&addr); + bridge.users_tx.remove(&addr); Some(()) }(); @@ -138,14 +138,14 @@ impl Server { key: cfg.key, map: cfg.map, server_tx: ws_send, - peers_tx: Default::default(), + users_tx: Default::default(), }; self.remote_bridges.write().unwrap().insert(addr, bridge); log::info!("remote server {addr} started bridging"); - // forward all messages to the right peer + // forward all messages to the right user let fut_recv = ws_recv .try_chunks(2) .map_err(|_| Error::BridgeClosed) @@ -155,13 +155,13 @@ impl Server { _ => return futures::future::err(Error::BridgeClosed), }; let res = move || -> Option<()> { - let peer_addr: SocketAddr = serde_json::from_str(&addr_msg).ok()?; + let user_addr: SocketAddr = serde_json::from_str(&addr_msg).ok()?; self.remote_bridges .read() .unwrap() .get(&addr)? - .peers_tx - .get(&peer_addr) + .users_tx + .get(&user_addr) .map(|tx| { tx.unbounded_send(WebSocketMessage::Text(payload_msg)).ok(); }); @@ -200,7 +200,7 @@ impl Server { let (tx, rx) = unbounded(); let fut_send = rx.map(Ok).forward(ws_send); - let mut bridge_peers: HashMap = Default::default(); + let mut bridge_users: HashMap> = Default::default(); let server_2 = server.clone(); @@ -214,13 +214,13 @@ impl Server { }; let res = || -> Option<()> { - let addr: SocketAddr = serde_json::from_str(&addr_msg).ok()?; - let peer = match bridge_peers.get_mut(&addr) { - Some(peer) => peer, + let token: String = serde_json::from_str(&addr_msg).ok()?; + let user = match bridge_users.get(&token).cloned() { + Some(user) => user, None => { - let (peer_send, peer_recv) = unbounded(); + let (user_send, user_recv) = unbounded(); let addr_msg = addr_msg.clone(); - let fut = peer_recv + let fut = user_recv .flat_map(move |payload| { stream::iter(vec![ WebSocketMessage::Text(addr_msg.clone()), @@ -247,21 +247,21 @@ impl Server { .map(Ok) .forward(tx.clone()); tokio::spawn(fut); - let peer = Peer::new(addr, peer_send); - bridge_peers.insert(addr, peer); - bridge_peers.get_mut(&addr).unwrap() + let user = Arc::new(User::new(token.clone(), user_send)); + bridge_users.insert(token.clone(), user.clone()); + user } }; let pkt: Result = serde_json::from_str(&payload_msg); match pkt { Ok(pkt) => match &pkt.content { - Request::JoinMap(map) => { - if map == &cfg.map { - server.handle_request(peer, pkt); + Request::JoinMap(join) => { + if join.name == cfg.map { + server.handle_request(user, pkt); } else { Server::send( - &peer, + &user, pkt.id, Message::Response(Err(Error::MapNotFound)), ); @@ -270,14 +270,14 @@ impl Server { Request::ListMaps => { let mut maps = server.get_maps(); maps.retain(|m| m.name == cfg.map); - server.do_respond(peer, &pkt, Ok(Response::Maps(maps))); + server.do_broadcast(&user, &pkt); } - _ => server.handle_request(peer, pkt), + _ => server.handle_request(user, pkt), }, Err(e) => { log::error!("failed to parse message: {e} in {payload_msg}"); Server::send( - &peer, + &user, None, Message::Response(Err(Error::BadRequest(e.to_string()))), ) @@ -302,18 +302,18 @@ impl Server { }); // store the task, so it can be cancelled. - *server_2.bridge.lock().unwrap() = Some(handle); + *server_2.bridge.lock() = Some(handle); Ok(()) } pub fn close_bridge(&self) { - let mut val = self.bridge.lock().unwrap(); + let mut bridge = self.bridge.lock(); - if let Some(handle) = val.as_ref() { + if let Some(handle) = bridge.as_ref() { handle.abort(); log::info!("bridge aborted"); - *val = None; + *bridge = None; } } } diff --git a/server/src/bridge_router.rs b/server/src/bridge_router.rs index 3c0c1b2..b537e1a 100644 --- a/server/src/bridge_router.rs +++ b/server/src/bridge_router.rs @@ -42,7 +42,7 @@ pub(crate) async fn bridge_oneshot( .values_mut() .find(|v| v.key == key) .ok_or(Error::BridgeNotFound)?; - bridge.peers_tx.insert(addr, http_tx); + bridge.users_tx.insert(addr, http_tx); } { let bridges = server.remote_bridges.read().unwrap(); diff --git a/server/src/router.rs b/server/src/router.rs index f94cd1c..63811db 100644 --- a/server/src/router.rs +++ b/server/src/router.rs @@ -18,7 +18,6 @@ use axum_extra::{ }; use axum_server::tls_rustls::RustlsConfig; -use futures::{channel::mpsc::unbounded, StreamExt}; use rand::Rng; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tower_http::{ @@ -30,21 +29,6 @@ use vek::num_traits::clamp; use crate::{base64::Base64, error::Error, protocol::*, server::User, util::timestamp_now}; use crate::{Cli, Server}; -lazy_static::lazy_static! { - static ref NAMELESS_TEE: Arc = { - let name = "nameless tee".to_string(); - let token = name.clone(); - let (tx, rx) = unbounded(); - tokio::spawn(async move { - rx.for_each(|v| { - log::debug!("message to nameless tee: {v:?}"); - futures::future::ready(()) - }) - }); - Arc::new(User::new(name, token, tx)) - }; -} - pub struct Router { addr: SocketAddr, router: axum::Router, @@ -237,7 +221,8 @@ fn ensure_access_authorized(user: Option<&User>, map: &str, server: &Server) -> if !authorized { log::debug!( "unauthorized: `{map}` for {}", - user.unwrap_or(&NAMELESS_TEE).token + user.map(|user| user.token.as_str()) + .unwrap_or("nameless tee") ); } authorized.then_some(()).ok_or(Error::Unauthorized) diff --git a/server/src/server.rs b/server/src/server.rs index 479488e..29b6aff 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -26,18 +26,18 @@ use crate::{ }; #[cfg(feature = "bridge")] -use tokio::task::JoinHandle; - +use crate::bridge::Bridge; +#[cfg(feature = "bridge")] +use std::net::SocketAddr; #[cfg(feature = "bridge")] use std::sync::RwLock; - #[cfg(feature = "bridge")] -use crate::bridge::Bridge; +use tokio::task::JoinHandle; type Tx = UnboundedSender; pub struct User { - pub name: String, + pub name: Option, pub token: String, pub id: Uuid, pub room: Mutex>>, @@ -46,10 +46,9 @@ pub struct User { } impl User { - pub fn new(name: String, token: String, tx: Tx) -> Self { - println!("new user {token}"); + pub fn new(token: String, tx: Tx) -> Self { User { - name, + name: None, token, id: Uuid::new_v4(), room: Default::default(), @@ -72,9 +71,9 @@ pub struct Server { pub max_maps: usize, pub max_map_size: usize, // in bytes pub max_users: usize, - #[cfg(feature = "bridge")] + #[cfg(feature = "bridge_out")] pub bridge: Mutex>>, - #[cfg(feature = "bridge")] + #[cfg(feature = "bridge_in")] pub remote_bridges: RwLock>, } @@ -356,7 +355,7 @@ impl Server { let (ws_send, rx) = unbounded(); let fut_send = rx.map(Ok).forward(tx); - let user = Arc::new(User::new("nameless tee".into(), token.clone(), ws_send)); + let user = Arc::new(User::new(token.clone(), ws_send)); { let user_count = self.users().len(); log::debug!(