Skip to content

Commit

Permalink
Fix minor bug: client logout message didn't cause WS connection to cl…
Browse files Browse the repository at this point in the history
…ose by server.
  • Loading branch information
elonen committed Oct 19, 2023
1 parent ff5061c commit 28c6b4b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
26 changes: 15 additions & 11 deletions server/src/api_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod tests;
mod file_upload;
use file_upload::handle_multipart_upload;

use crate::api_server::ws_handers::SessionClose;
use crate::database::{models, DB};
use crate::video_pipeline::IncomingFile;

Expand Down Expand Up @@ -80,10 +81,10 @@ pub struct WsSessionArgs<'a> {
impl WsSessionArgs<'_> {

/// Send a command to client websocket(s).
///
///
/// If send_to is a string, it is interpreted either as a video hash or user id. Returns the
/// number of sessions the message was sent to.
///
///
/// - If it turns out to be a video hash, the message is sent to all websocket
/// that are watching it.
/// - If it's a user id, the message is sent to all websocket connections that user has open.
Expand All @@ -102,7 +103,7 @@ impl WsSessionArgs<'_> {
SendTo::MsgSender(sender) => { sender.send(msg)?; Ok(1u32) },
}
}

pub fn push_notify_message(&self, msg: &models::MessageInsert, persist: bool) -> Res<()> {
let send_res = self.emit_cmd("message", &msg.to_json()?, SendTo::UserId(&msg.user_id));
if let Ok(sent_count) = send_res {
Expand Down Expand Up @@ -137,7 +138,7 @@ impl WsSessionArgs<'_> {
}
}
let mut fields = c.to_json()?;
fields["comment_id"] = fields["id"].take(); // swap id with comment_id, because the client expects comment_id
fields["comment_id"] = fields["id"].take(); // swap id with comment_id, because the client expects comment_id
self.emit_cmd("new_comment", &fields , send_to).map(|_| ())
}

Expand Down Expand Up @@ -175,8 +176,8 @@ async fn handle_ws_session(
let (mut ws_tx, mut ws_rx) = ws.split();

// Let the client know user's id and name
if let Err(e) = ses.emit_cmd("welcome",
&serde_json::json!({ "user_id": user_id, "username": username }),
if let Err(e) = ses.emit_cmd("welcome",
&serde_json::json!({ "user_id": user_id, "username": username }),
SendTo::CurSession()) {
tracing::error!(details=%e, "Error sending welcome message. Closing session.");
return;
Expand Down Expand Up @@ -223,7 +224,7 @@ async fn handle_ws_session(
// Check data fields for length. Only "drawing" is allowed to be long.
for (k, v) in data.as_object().unwrap_or(&serde_json::Map::new()) {
if k != "drawing" && v.as_str().map(|s| s.len() > 2048).unwrap_or(false) { bail!("Field too long"); }
}
}
Ok((cmd, data))
}

Expand All @@ -242,7 +243,10 @@ async fn handle_ws_session(
tracing::debug!(cmd=%cmd, "Msg from client.");

if let Err(e) = msg_dispatch(&cmd, &data, &mut ses).await {
if let Some(e) = e.downcast_ref::<tokio::sync::mpsc::error::SendError<Message>>() {
if let Some(e) = e.downcast_ref::<SessionClose>() {
if !matches!(e, SessionClose::Logout) { tracing::info!("[{}] Closing session: {:?}", sid, e); }
break;
} else if let Some(e) = e.downcast_ref::<tokio::sync::mpsc::error::SendError<Message>>() {
tracing::error!("[{}] Error sending message. Closing session. -- {}", sid, e);
break;
} else {
Expand Down Expand Up @@ -274,7 +278,7 @@ async fn handle_ws_session(
}

/// Extract user id and name from HTTP headers (set by nginx)
fn parse_auth_headers(hdrs: &HeaderMap) -> (String, String)
fn parse_auth_headers(hdrs: &HeaderMap) -> (String, String)
{
fn try_get_first_named_hdr<T>(hdrs: &HeaderMap, names: T) -> Option<String>
where T: IntoIterator<Item=&'static str> {
Expand All @@ -295,7 +299,7 @@ fn parse_auth_headers(hdrs: &HeaderMap) -> (String, String)
}};
let user_name = try_get_first_named_hdr(&hdrs, vec!["X-Remote-User-Name", "X_Remote_User_Name", "HTTP_X_REMOTE_USER_NAME"])
.unwrap_or_else(|| user_id.clone());

(user_id, user_name)
}

Expand Down Expand Up @@ -399,7 +403,7 @@ async fn run_api_server_async(
if let Err(_) = server_state.send_to_all_video_sessions(&vh, &msg) {
tracing::error!(video=vh, "Failed to send notification to video hash.");
}
}
}
};

// Message to a single user
Expand Down
14 changes: 9 additions & 5 deletions server/src/api_server/ws_handers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,12 @@ pub async fn msg_collab_report(data: &serde_json::Value, ses: &mut WsSessionArgs
ses.emit_cmd("collab_cmd", &msg, super::SendTo::CurCollab()).map(|_| ())
}

// custom logout error with thiserror

pub async fn msg_logout(data: &serde_json::Value, ses: &mut WsSessionArgs<'_>) -> Res<()> {
tracing::info!("logout: user={}", ses.user_id);
drop(ses.sender);
Ok(())
#[derive(thiserror::Error, Debug)]
pub enum SessionClose {
#[error("User logout")]
Logout,
}


Expand All @@ -432,7 +433,10 @@ pub async fn msg_dispatch(cmd: &str, data: &serde_json::Value, ses: &mut WsSessi
"join_collab" => msg_join_collab(data, ses).await,
"leave_collab" => msg_leave_collab(data, ses).await,
"collab_report" => msg_collab_report(data, ses).await,
"logout" => msg_logout(data, ses).await,
"logout" => {
tracing::info!("logout from client: user={}", ses.user_id);
return Err(SessionClose::Logout.into());
},
"echo" => {
let answ = format!("Echo: {}", data.as_str().ok_or(anyhow!("data not found"))?);
ses.sender.send(WsMsg::text(answ))?;
Expand Down

0 comments on commit 28c6b4b

Please sign in to comment.