From 2afb355e4856b1819af0c54e57d6476656630eca Mon Sep 17 00:00:00 2001 From: Aram Drevekenin Date: Fri, 18 Nov 2022 10:21:59 +0100 Subject: [PATCH] fix(router): handle client buffer overflow (#1955) * fix(router): handle client buffer overflow * style(fmt): rustfmt --- zellij-server/src/os_input_output.rs | 53 +++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/zellij-server/src/os_input_output.rs b/zellij-server/src/os_input_output.rs index 24a2ace217..07bd7c0f42 100644 --- a/zellij-server/src/os_input_output.rs +++ b/zellij-server/src/os_input_output.rs @@ -13,12 +13,15 @@ use nix::{ use signal_hook::consts::*; use sysinfo::{ProcessExt, ProcessRefreshKind, System, SystemExt}; use zellij_utils::{ - async_std, + async_std, channels, data::Palette, errors::prelude::*, input::command::{RunCommand, TerminalAction}, interprocess, - ipc::{ClientToServerMsg, IpcReceiverWithContext, IpcSenderWithContext, ServerToClientMsg}, + ipc::{ + ClientToServerMsg, ExitReason, IpcReceiverWithContext, IpcSenderWithContext, + ServerToClientMsg, + }, libc, nix, shared::default_palette, signal_hook, @@ -323,10 +326,50 @@ fn spawn_terminal( handle_terminal(cmd, failover_cmd, orig_termios, quit_cb, terminal_id) } +// The ClientSender is in charge of sending messages to the client on a special thread +// This is done so that when the unix socket buffer is full, we won't block the entire router +// thread +// When the above happens, the ClientSender buffers messages in hopes that the congestion will be +// freed until we runs out of buffer space. +// If we run out of buffer space, we bubble up an error sot hat the router thread will give up on +// this client and we'll stop sending messages to it. +// If the client ever becomes responsive again, we'll send one final "Buffer full" message so it +// knows what happened. +#[derive(Clone)] +struct ClientSender { + client_id: ClientId, + client_buffer_sender: channels::Sender, +} + +impl ClientSender { + pub fn new(client_id: ClientId, mut sender: IpcSenderWithContext) -> Self { + let (client_buffer_sender, client_buffer_receiver) = channels::bounded(50); + std::thread::spawn(move || { + let err_context = || format!("failed to send message to client {client_id}"); + for msg in client_buffer_receiver.iter() { + let _ = sender.send(msg).with_context(err_context); + } + let _ = sender.send(ServerToClientMsg::Exit(ExitReason::Error( + "Buffer full".to_string(), + ))); + }); + ClientSender { + client_id, + client_buffer_sender, + } + } + pub fn send_or_buffer(&self, msg: ServerToClientMsg) -> Result<()> { + let err_context = || format!("Client {} send buffer full", self.client_id); + self.client_buffer_sender + .try_send(msg) + .with_context(err_context) + } +} + #[derive(Clone)] pub struct ServerOsInputOutput { orig_termios: Arc>, - client_senders: Arc>>>, + client_senders: Arc>>, terminal_id_to_raw_fd: Arc>>>, // A value of None means the // terminal_id exists but is // not connected to an fd (eg. @@ -589,7 +632,7 @@ impl ServerOsApi for ServerOsInputOutput { .with_context(err_context)? .get_mut(&client_id) { - sender.send(msg).with_context(err_context) + sender.send_or_buffer(msg).with_context(err_context) } else { Ok(()) } @@ -601,7 +644,7 @@ impl ServerOsApi for ServerOsInputOutput { stream: LocalSocketStream, ) -> Result> { let receiver = IpcReceiverWithContext::new(stream); - let sender = receiver.get_sender(); + let sender = ClientSender::new(client_id, receiver.get_sender()); self.client_senders .lock() .to_anyhow()