Skip to content

Commit

Permalink
fix bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
k2d222 committed Oct 26, 2024
1 parent 2699d93 commit c018927
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 59 deletions.
60 changes: 30 additions & 30 deletions server/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use tokio_tungstenite::tungstenite::protocol::{frame::coding::CloseCode, CloseFrame};
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;

use crate::{error::Error, protocol::*, room::Peer, server::Server, util::*};
use crate::{error::Error, protocol::*, server::Server, server::User, util::*};

use futures_util::{future::Either, stream};
use tokio_tungstenite::connect_async;
Expand All @@ -21,7 +21,7 @@ pub struct Bridge {
pub key: String,
pub map: String,
pub server_tx: Tx,
pub peers_tx: HashMap<SocketAddr, Tx>,
pub users_tx: HashMap<SocketAddr, Tx>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -51,7 +51,7 @@ impl Server {
.find(|(_, v)| v.key == key)
.ok_or(Error::BridgeNotFound)?;

bridge.peers_tx.insert(addr, ws_send);
bridge.users_tx.insert(addr, ws_send);
bridge_addr.clone()
};

Expand Down Expand Up @@ -93,7 +93,7 @@ impl Server {
Either::Right((res, _)) => res,
};

// remove the peer, send logout
// remove the user, send logout
|| -> Option<()> {
let mut bridges = self.remote_bridges.write().unwrap();
let bridge = bridges.get_mut(&bridge_addr)?;
Expand All @@ -110,7 +110,7 @@ impl Server {
.unbounded_send(WebSocketMessage::Text(logout_msg))
.ok();

bridge.peers_tx.remove(&addr);
bridge.users_tx.remove(&addr);
Some(())
}();

Expand Down Expand Up @@ -138,14 +138,14 @@ impl Server {
key: cfg.key,
map: cfg.map,
server_tx: ws_send,
peers_tx: Default::default(),
users_tx: Default::default(),
};

self.remote_bridges.write().unwrap().insert(addr, bridge);

log::info!("remote server {addr} started bridging");

// forward all messages to the right peer
// forward all messages to the right user
let fut_recv = ws_recv
.try_chunks(2)
.map_err(|_| Error::BridgeClosed)
Expand All @@ -155,13 +155,13 @@ impl Server {
_ => return futures::future::err(Error::BridgeClosed),
};
let res = move || -> Option<()> {
let peer_addr: SocketAddr = serde_json::from_str(&addr_msg).ok()?;
let user_addr: SocketAddr = serde_json::from_str(&addr_msg).ok()?;
self.remote_bridges
.read()
.unwrap()
.get(&addr)?
.peers_tx
.get(&peer_addr)
.users_tx
.get(&user_addr)
.map(|tx| {
tx.unbounded_send(WebSocketMessage::Text(payload_msg)).ok();
});
Expand Down Expand Up @@ -200,7 +200,7 @@ impl Server {
let (tx, rx) = unbounded();
let fut_send = rx.map(Ok).forward(ws_send);

let mut bridge_peers: HashMap<SocketAddr, Peer> = Default::default();
let mut bridge_users: HashMap<String, Arc<User>> = Default::default();

let server_2 = server.clone();

Expand All @@ -214,13 +214,13 @@ impl Server {
};

let res = || -> Option<()> {
let addr: SocketAddr = serde_json::from_str(&addr_msg).ok()?;
let peer = match bridge_peers.get_mut(&addr) {
Some(peer) => peer,
let token: String = serde_json::from_str(&addr_msg).ok()?;
let user = match bridge_users.get(&token).cloned() {
Some(user) => user,
None => {
let (peer_send, peer_recv) = unbounded();
let (user_send, user_recv) = unbounded();
let addr_msg = addr_msg.clone();
let fut = peer_recv
let fut = user_recv
.flat_map(move |payload| {
stream::iter(vec![
WebSocketMessage::Text(addr_msg.clone()),
Expand All @@ -247,21 +247,21 @@ impl Server {
.map(Ok)
.forward(tx.clone());
tokio::spawn(fut);
let peer = Peer::new(addr, peer_send);
bridge_peers.insert(addr, peer);
bridge_peers.get_mut(&addr).unwrap()
let user = Arc::new(User::new(token.clone(), user_send));
bridge_users.insert(token.clone(), user.clone());
user
}
};

let pkt: Result<RecvPacket, _> = serde_json::from_str(&payload_msg);
match pkt {
Ok(pkt) => match &pkt.content {
Request::JoinMap(map) => {
if map == &cfg.map {
server.handle_request(peer, pkt);
Request::JoinMap(join) => {
if join.name == cfg.map {
server.handle_request(user, pkt);
} else {
Server::send(
&peer,
&user,
pkt.id,
Message::Response(Err(Error::MapNotFound)),
);
Expand All @@ -270,14 +270,14 @@ impl Server {
Request::ListMaps => {
let mut maps = server.get_maps();
maps.retain(|m| m.name == cfg.map);
server.do_respond(peer, &pkt, Ok(Response::Maps(maps)));
server.do_broadcast(&user, &pkt);
}
_ => server.handle_request(peer, pkt),
_ => server.handle_request(user, pkt),
},
Err(e) => {
log::error!("failed to parse message: {e} in {payload_msg}");
Server::send(
&peer,
&user,
None,
Message::Response(Err(Error::BadRequest(e.to_string()))),
)
Expand All @@ -302,18 +302,18 @@ impl Server {
});

// store the task, so it can be cancelled.
*server_2.bridge.lock().unwrap() = Some(handle);
*server_2.bridge.lock() = Some(handle);

Ok(())
}

pub fn close_bridge(&self) {
let mut val = self.bridge.lock().unwrap();
let mut bridge = self.bridge.lock();

if let Some(handle) = val.as_ref() {
if let Some(handle) = bridge.as_ref() {
handle.abort();
log::info!("bridge aborted");
*val = None;
*bridge = None;
}
}
}
2 changes: 1 addition & 1 deletion server/src/bridge_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) async fn bridge_oneshot(
.values_mut()
.find(|v| v.key == key)
.ok_or(Error::BridgeNotFound)?;
bridge.peers_tx.insert(addr, http_tx);
bridge.users_tx.insert(addr, http_tx);
}
{
let bridges = server.remote_bridges.read().unwrap();
Expand Down
19 changes: 2 additions & 17 deletions server/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use axum_extra::{
};
use axum_server::tls_rustls::RustlsConfig;

use futures::{channel::mpsc::unbounded, StreamExt};
use rand::Rng;
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
use tower_http::{
Expand All @@ -30,21 +29,6 @@ use vek::num_traits::clamp;
use crate::{base64::Base64, error::Error, protocol::*, server::User, util::timestamp_now};
use crate::{Cli, Server};

lazy_static::lazy_static! {
static ref NAMELESS_TEE: Arc<User> = {
let name = "nameless tee".to_string();
let token = name.clone();
let (tx, rx) = unbounded();
tokio::spawn(async move {
rx.for_each(|v| {
log::debug!("message to nameless tee: {v:?}");
futures::future::ready(())
})
});
Arc::new(User::new(name, token, tx))
};
}

pub struct Router {
addr: SocketAddr,
router: axum::Router,
Expand Down Expand Up @@ -237,7 +221,8 @@ fn ensure_access_authorized(user: Option<&User>, map: &str, server: &Server) ->
if !authorized {
log::debug!(
"unauthorized: `{map}` for {}",
user.unwrap_or(&NAMELESS_TEE).token
user.map(|user| user.token.as_str())
.unwrap_or("nameless tee")
);
}
authorized.then_some(()).ok_or(Error::Unauthorized)
Expand Down
21 changes: 10 additions & 11 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ use crate::{
};

#[cfg(feature = "bridge")]
use tokio::task::JoinHandle;

use crate::bridge::Bridge;
#[cfg(feature = "bridge")]
use std::net::SocketAddr;
#[cfg(feature = "bridge")]
use std::sync::RwLock;

#[cfg(feature = "bridge")]
use crate::bridge::Bridge;
use tokio::task::JoinHandle;

type Tx = UnboundedSender<WebSocketMessage>;

pub struct User {
pub name: String,
pub name: Option<String>,
pub token: String,
pub id: Uuid,
pub room: Mutex<Option<Arc<Room>>>,
Expand All @@ -46,10 +46,9 @@ pub struct User {
}

impl User {
pub fn new(name: String, token: String, tx: Tx) -> Self {
println!("new user {token}");
pub fn new(token: String, tx: Tx) -> Self {
User {
name,
name: None,
token,
id: Uuid::new_v4(),
room: Default::default(),
Expand All @@ -72,9 +71,9 @@ pub struct Server {
pub max_maps: usize,
pub max_map_size: usize, // in bytes
pub max_users: usize,
#[cfg(feature = "bridge")]
#[cfg(feature = "bridge_out")]
pub bridge: Mutex<Option<JoinHandle<()>>>,
#[cfg(feature = "bridge")]
#[cfg(feature = "bridge_in")]
pub remote_bridges: RwLock<HashMap<SocketAddr, Bridge>>,
}

Expand Down Expand Up @@ -356,7 +355,7 @@ impl Server {
let (ws_send, rx) = unbounded();
let fut_send = rx.map(Ok).forward(tx);

let user = Arc::new(User::new("nameless tee".into(), token.clone(), ws_send));
let user = Arc::new(User::new(token.clone(), ws_send));
{
let user_count = self.users().len();
log::debug!(
Expand Down

0 comments on commit c018927

Please sign in to comment.