Skip to content

Commit

Permalink
fix(router): gracefully handle client crashes (#1710)
Browse files Browse the repository at this point in the history
* fix(router): gracefully handle client crashes

* style(comments): remove unused
  • Loading branch information
imsnif authored Sep 2, 2022
1 parent 93f0f78 commit d68d407
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn assert_socket(name: &str) -> bool {
match LocalSocketStream::connect(path) {
Ok(stream) => {
let mut sender = IpcSenderWithContext::new(stream);
sender.send(ClientToServerMsg::ConnStatus);
let _ = sender.send(ClientToServerMsg::ConnStatus);
let mut receiver: IpcReceiverWithContext<ServerToClientMsg> = sender.get_receiver();
match receiver.recv() {
Some((ServerToClientMsg::Connected, _)) => true,
Expand Down Expand Up @@ -115,7 +115,7 @@ pub(crate) fn kill_session(name: &str) {
let path = &*ZELLIJ_SOCK_DIR.join(name);
match LocalSocketStream::connect(path) {
Ok(stream) => {
IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
let _ = IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
},
Err(e) => {
eprintln!("Error occurred: {:?}", e);
Expand Down
4 changes: 3 additions & 1 deletion zellij-client/src/os_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ impl ClientOsApi for ClientOsInputOutput {
}

fn send_to_server(&self, msg: ClientToServerMsg) {
self.send_instructions_to_server
// TODO: handle the error here, right now we silently ignore it
let _ = self
.send_instructions_to_server
.lock()
.unwrap()
.as_mut()
Expand Down
2 changes: 1 addition & 1 deletion zellij-client/src/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) fn kill_session(name: &str) {
let path = &*ZELLIJ_SOCK_DIR.join(name);
match LocalSocketStream::connect(path) {
Ok(stream) => {
IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
let _ = IpcSenderWithContext::new(stream).send(ClientToServerMsg::KillSession);
},
Err(e) => {
eprintln!("Error occurred: {:?}", e);
Expand Down
50 changes: 40 additions & 10 deletions zellij-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ macro_rules! remove_client {
};
}

macro_rules! send_to_client {
($client_id:expr, $os_input:expr, $msg:expr, $session_state:expr) => {
let send_to_client_res = $os_input.send_to_client($client_id, $msg);
if let Err(_) = send_to_client_res {
// failed to send to client, remove it
remove_client!($client_id, $os_input, $session_state);
}
};
}

#[derive(Clone, Debug, PartialEq)]
pub(crate) struct SessionState {
clients: HashMap<ClientId, Option<Size>>,
Expand Down Expand Up @@ -392,15 +402,26 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
Event::ModeUpdate(mode_info),
))
.unwrap();
os_input.send_to_client(client_id, ServerToClientMsg::SwitchToMode(mode));
send_to_client!(
client_id,
os_input,
ServerToClientMsg::SwitchToMode(mode),
session_state
);
},
ServerInstruction::UnblockInputThread => {
for client_id in session_state.read().unwrap().clients.keys() {
os_input.send_to_client(*client_id, ServerToClientMsg::UnblockInputThread);
send_to_client!(
*client_id,
os_input,
ServerToClientMsg::UnblockInputThread,
session_state
);
}
},
ServerInstruction::ClientExit(client_id) => {
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
let _ =
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state);
if let Some(min_size) = session_state.read().unwrap().min_client_terminal_size() {
session_data
Expand Down Expand Up @@ -465,14 +486,16 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
ServerInstruction::KillSession => {
let client_ids = session_state.read().unwrap().client_ids();
for client_id in client_ids {
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
let _ = os_input
.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state);
}
break;
},
ServerInstruction::DetachSession(client_ids) => {
for client_id in client_ids {
os_input.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
let _ = os_input
.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state);
if let Some(min_size) = session_state.read().unwrap().min_client_terminal_size()
{
Expand Down Expand Up @@ -509,14 +532,16 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
// If `None`- Send an exit instruction. This is the case when a user closes the last Tab/Pane.
if let Some(output) = &serialized_output {
for (client_id, client_render_instruction) in output.iter() {
os_input.send_to_client(
send_to_client!(
*client_id,
os_input,
ServerToClientMsg::Render(client_render_instruction.clone()),
session_state
);
}
} else {
for client_id in client_ids {
os_input
let _ = os_input
.send_to_client(client_id, ServerToClientMsg::Exit(ExitReason::Normal));
remove_client!(client_id, os_input, session_state);
}
Expand All @@ -526,7 +551,7 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
ServerInstruction::Error(backtrace) => {
let client_ids = session_state.read().unwrap().client_ids();
for client_id in client_ids {
os_input.send_to_client(
let _ = os_input.send_to_client(
client_id,
ServerToClientMsg::Exit(ExitReason::Error(backtrace.clone())),
);
Expand All @@ -535,7 +560,7 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
break;
},
ServerInstruction::ConnStatus(client_id) => {
os_input.send_to_client(client_id, ServerToClientMsg::Connected);
let _ = os_input.send_to_client(client_id, ServerToClientMsg::Connected);
remove_client!(client_id, os_input, session_state);
},
ServerInstruction::ActiveClients(client_id) => {
Expand All @@ -545,7 +570,12 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
client_ids,
client_id
);
os_input.send_to_client(client_id, ServerToClientMsg::ActiveClients(client_ids));
send_to_client!(
client_id,
os_input,
ServerToClientMsg::ActiveClients(client_ids),
session_state
);
},
}
}
Expand Down
16 changes: 13 additions & 3 deletions zellij-server/src/os_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ pub trait ServerOsApi: Send + Sync {
fn force_kill(&self, pid: Pid) -> Result<(), nix::Error>;
/// Returns a [`Box`] pointer to this [`ServerOsApi`] struct.
fn box_clone(&self) -> Box<dyn ServerOsApi>;
fn send_to_client(&self, client_id: ClientId, msg: ServerToClientMsg);
fn send_to_client(
&self,
client_id: ClientId,
msg: ServerToClientMsg,
) -> Result<(), &'static str>;
fn new_client(
&mut self,
client_id: ClientId,
Expand Down Expand Up @@ -373,9 +377,15 @@ impl ServerOsApi for ServerOsInputOutput {
let _ = kill(pid, Some(Signal::SIGKILL));
Ok(())
}
fn send_to_client(&self, client_id: ClientId, msg: ServerToClientMsg) {
fn send_to_client(
&self,
client_id: ClientId,
msg: ServerToClientMsg,
) -> Result<(), &'static str> {
if let Some(sender) = self.client_senders.lock().unwrap().get_mut(&client_id) {
sender.send(msg);
sender.send(msg)
} else {
Ok(())
}
}
fn new_client(
Expand Down
9 changes: 7 additions & 2 deletions zellij-server/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,15 @@ pub(crate) fn route_thread_main(
let client_id = maybe_client_id.unwrap_or(client_id);
if let Some(rlocked_sessions) = rlocked_sessions.as_ref() {
if let Action::SwitchToMode(input_mode) = action {
os_input.send_to_client(
let send_res = os_input.send_to_client(
client_id,
ServerToClientMsg::SwitchToMode(input_mode),
);
if send_res.is_err() {
let _ = to_server
.send(ServerInstruction::RemoveClient(client_id));
return true;
}
}
if route_action(
action,
Expand Down Expand Up @@ -642,7 +647,7 @@ pub(crate) fn route_thread_main(
},
None => {
log::error!("Received empty message from client");
os_input.send_to_client(
let _ = os_input.send_to_client(
client_id,
ServerToClientMsg::Exit(ExitReason::Error(
"Received empty message".to_string(),
Expand Down
6 changes: 5 additions & 1 deletion zellij-server/src/tab/unit/tab_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ impl ServerOsApi for FakeInputOutput {
fn box_clone(&self) -> Box<dyn ServerOsApi> {
Box::new((*self).clone())
}
fn send_to_client(&self, _client_id: ClientId, _msg: ServerToClientMsg) {
fn send_to_client(
&self,
_client_id: ClientId,
_msg: ServerToClientMsg,
) -> Result<(), &'static str> {
unimplemented!()
}
fn new_client(
Expand Down
6 changes: 5 additions & 1 deletion zellij-server/src/tab/unit/tab_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ impl ServerOsApi for FakeInputOutput {
fn box_clone(&self) -> Box<dyn ServerOsApi> {
Box::new((*self).clone())
}
fn send_to_client(&self, _client_id: ClientId, _msg: ServerToClientMsg) {
fn send_to_client(
&self,
_client_id: ClientId,
_msg: ServerToClientMsg,
) -> Result<(), &'static str> {
unimplemented!()
}
fn new_client(
Expand Down
6 changes: 5 additions & 1 deletion zellij-server/src/unit/screen_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ impl ServerOsApi for FakeInputOutput {
fn box_clone(&self) -> Box<dyn ServerOsApi> {
Box::new((*self).clone())
}
fn send_to_client(&self, _client_id: ClientId, _msg: ServerToClientMsg) {
fn send_to_client(
&self,
_client_id: ClientId,
_msg: ServerToClientMsg,
) -> Result<(), &'static str> {
unimplemented!()
}
fn new_client(
Expand Down
14 changes: 9 additions & 5 deletions zellij-utils/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,16 @@ impl<T: Serialize> IpcSenderWithContext<T> {
}

/// Sends an event, along with the current [`ErrorContext`], on this [`IpcSenderWithContext`]'s socket.
pub fn send(&mut self, msg: T) {
pub fn send(&mut self, msg: T) -> Result<(), &'static str> {
let err_ctx = get_current_ctx();
rmp_serde::encode::write(&mut self.sender, &(msg, err_ctx)).unwrap();
// TODO: unwrapping here can cause issues when the server disconnects which we don't mind
// do we need to handle errors here in other cases?
let _ = self.sender.flush();
if rmp_serde::encode::write(&mut self.sender, &(msg, err_ctx)).is_err() {
Err("Failed to send message to client")
} else {
// TODO: unwrapping here can cause issues when the server disconnects which we don't mind
// do we need to handle errors here in other cases?
let _ = self.sender.flush();
Ok(())
}
}

/// Returns an [`IpcReceiverWithContext`] with the same socket as this sender.
Expand Down

0 comments on commit d68d407

Please sign in to comment.