From e8f955906293fcfe84d73a8af4a3c7f458708d06 Mon Sep 17 00:00:00 2001 From: Thomas Linford Date: Mon, 16 May 2022 21:14:57 +0200 Subject: [PATCH] fix(pty): paste freeze with large amounts of text (#1383) add pty writer thread to avoid screen thread blocking on unistd::write --- zellij-server/src/lib.rs | 30 +++++++++++++++++++++++++++++ zellij-server/src/pty_writer.rs | 34 +++++++++++++++++++++++++++++++++ zellij-server/src/tab/mod.rs | 17 ++++++++--------- zellij-server/src/thread_bus.rs | 24 +++++++++++++++++++++-- zellij-utils/src/errors.rs | 7 +++++++ 5 files changed, 101 insertions(+), 11 deletions(-) create mode 100644 zellij-server/src/pty_writer.rs diff --git a/zellij-server/src/lib.rs b/zellij-server/src/lib.rs index 0e530dfcbe..c170541f58 100644 --- a/zellij-server/src/lib.rs +++ b/zellij-server/src/lib.rs @@ -5,6 +5,7 @@ pub mod tab; mod logging_pipe; mod pty; +mod pty_writer; mod route; mod screen; mod thread_bus; @@ -12,6 +13,7 @@ mod ui; mod wasm_vm; use log::info; +use pty_writer::{pty_writer_main, PtyWriteInstruction}; use std::collections::{HashMap, HashSet}; use std::{ path::PathBuf, @@ -106,6 +108,7 @@ pub(crate) struct SessionMetaData { screen_thread: Option>, pty_thread: Option>, wasm_thread: Option>, + pty_writer_thread: Option>, } impl Drop for SessionMetaData { @@ -116,6 +119,7 @@ impl Drop for SessionMetaData { let _ = self.screen_thread.take().unwrap().join(); let _ = self.pty_thread.take().unwrap().join(); let _ = self.wasm_thread.take().unwrap().join(); + let _ = self.pty_writer_thread.take().unwrap().join(); } } @@ -583,6 +587,10 @@ fn init_session( let (to_pty, pty_receiver): ChannelWithContext = channels::unbounded(); let to_pty = SenderWithContext::new(to_pty); + let (to_pty_writer, pty_writer_receiver): ChannelWithContext = + channels::unbounded(); + let to_pty_writer = SenderWithContext::new(to_pty_writer); + // Determine and initialize the data directory let data_dir = opts.data_dir.unwrap_or_else(get_default_data_dir); @@ -607,6 +615,7 @@ fn init_session( None, Some(&to_plugin), Some(&to_server), + Some(&to_pty_writer), Some(os_input.clone()), ), opts.debug, @@ -625,6 +634,7 @@ fn init_session( Some(&to_pty), Some(&to_plugin), Some(&to_server), + Some(&to_pty_writer), Some(os_input.clone()), ); let max_panes = opts.max_panes; @@ -644,6 +654,7 @@ fn init_session( Some(&to_pty), Some(&to_plugin), None, + Some(&to_pty_writer), None, ); let store = Store::default(); @@ -651,11 +662,29 @@ fn init_session( move || wasm_thread_main(plugin_bus, store, data_dir, plugins.unwrap_or_default()) }) .unwrap(); + + let pty_writer_thread = thread::Builder::new() + .name("pty_writer".to_string()) + .spawn({ + let pty_writer_bus = Bus::new( + vec![pty_writer_receiver], + Some(&to_screen), + Some(&to_pty), + Some(&to_plugin), + Some(&to_server), + None, + Some(os_input.clone()), + ); + || pty_writer_main(pty_writer_bus) + }) + .unwrap(); + SessionMetaData { senders: ThreadSenders { to_screen: Some(to_screen), to_pty: Some(to_pty), to_plugin: Some(to_plugin), + to_pty_writer: Some(to_pty_writer), to_server: None, should_silently_fail: false, }, @@ -665,5 +694,6 @@ fn init_session( screen_thread: Some(screen_thread), pty_thread: Some(pty_thread), wasm_thread: Some(wasm_thread), + pty_writer_thread: Some(pty_writer_thread), } } diff --git a/zellij-server/src/pty_writer.rs b/zellij-server/src/pty_writer.rs new file mode 100644 index 0000000000..9cdcd9769d --- /dev/null +++ b/zellij-server/src/pty_writer.rs @@ -0,0 +1,34 @@ +use zellij_utils::errors::{ContextType, PtyWriteContext}; + +use crate::thread_bus::Bus; + +#[derive(Debug, Clone)] +pub(crate) enum PtyWriteInstruction { + Write(Vec, i32), +} + +impl From<&PtyWriteInstruction> for PtyWriteContext { + fn from(tty_write_instruction: &PtyWriteInstruction) -> Self { + match *tty_write_instruction { + PtyWriteInstruction::Write(..) => PtyWriteContext::Write, + } + } +} + +pub(crate) fn pty_writer_main(bus: Bus) { + loop { + let (event, mut err_ctx) = bus.recv().expect("failed to receive event on channel"); + err_ctx.add_call(ContextType::PtyWrite((&event).into())); + let os_input = bus.os_input.clone().unwrap(); + match event { + PtyWriteInstruction::Write(bytes, terminal_id) => { + if let Err(e) = os_input.write_to_tty_stdin(terminal_id, &bytes) { + log::error!("failed to write to terminal: {}", e); + } + if let Err(e) = os_input.tcdrain(terminal_id) { + log::error!("failed to drain terminal: {}", e); + }; + } + } + } +} diff --git a/zellij-server/src/tab/mod.rs b/zellij-server/src/tab/mod.rs index 3ab22b3513..56e06c9278 100644 --- a/zellij-server/src/tab/mod.rs +++ b/zellij-server/src/tab/mod.rs @@ -9,6 +9,7 @@ use zellij_tile::prelude::Style; use zellij_utils::position::{Column, Line}; use zellij_utils::{position::Position, serde, zellij_tile}; +use crate::pty_writer::PtyWriteInstruction; use crate::screen::CopyOptions; use crate::ui::pane_boundaries_frame::FrameParams; @@ -869,15 +870,13 @@ impl Tab { .get(&pane_id) .unwrap_or_else(|| self.tiled_panes.get_pane(pane_id).unwrap()); let adjusted_input = active_terminal.adjust_input_to_terminal(input_bytes); - if let Err(e) = self - .os_api - .write_to_tty_stdin(active_terminal_id, &adjusted_input) - { - log::error!("failed to write to terminal: {}", e); - } - if let Err(e) = self.os_api.tcdrain(active_terminal_id) { - log::error!("failed to drain terminal: {}", e); - } + + self.senders + .send_to_pty_writer(PtyWriteInstruction::Write( + adjusted_input, + active_terminal_id, + )) + .unwrap(); } PaneId::Plugin(pid) => { for key in parse_keys(&input_bytes) { diff --git a/zellij-server/src/thread_bus.rs b/zellij-server/src/thread_bus.rs index c726299334..f2fd672e0e 100644 --- a/zellij-server/src/thread_bus.rs +++ b/zellij-server/src/thread_bus.rs @@ -1,8 +1,8 @@ //! Definitions and helpers for sending and receiving messages between threads. use crate::{ - os_input_output::ServerOsApi, pty::PtyInstruction, screen::ScreenInstruction, - wasm_vm::PluginInstruction, ServerInstruction, + os_input_output::ServerOsApi, pty::PtyInstruction, pty_writer::PtyWriteInstruction, + screen::ScreenInstruction, wasm_vm::PluginInstruction, ServerInstruction, }; use zellij_utils::{channels, channels::SenderWithContext, errors::ErrorContext}; @@ -13,6 +13,7 @@ pub(crate) struct ThreadSenders { pub to_pty: Option>, pub to_plugin: Option>, pub to_server: Option>, + pub to_pty_writer: Option>, // this is a convenience for the unit tests // it's not advisable to set it to true in production code pub should_silently_fail: bool, @@ -82,6 +83,22 @@ impl ThreadSenders { self.to_server.as_ref().unwrap().send(instruction) } } + pub fn send_to_pty_writer( + &self, + instruction: PtyWriteInstruction, + ) -> Result<(), channels::SendError<(PtyWriteInstruction, ErrorContext)>> { + if self.should_silently_fail { + let _ = self + .to_pty_writer + .as_ref() + .map(|sender| sender.send(instruction)) + .unwrap_or_else(|| Ok(())); + Ok(()) + } else { + self.to_pty_writer.as_ref().unwrap().send(instruction) + } + } + #[allow(unused)] pub fn silently_fail_on_send(mut self) -> Self { // this is mostly used for the tests, see struct @@ -105,6 +122,7 @@ impl Bus { to_pty: Option<&SenderWithContext>, to_plugin: Option<&SenderWithContext>, to_server: Option<&SenderWithContext>, + to_pty_writer: Option<&SenderWithContext>, os_input: Option>, ) -> Self { Bus { @@ -114,6 +132,7 @@ impl Bus { to_pty: to_pty.cloned(), to_plugin: to_plugin.cloned(), to_server: to_server.cloned(), + to_pty_writer: to_pty_writer.cloned(), should_silently_fail: false, }, os_input: os_input.clone(), @@ -129,6 +148,7 @@ impl Bus { to_pty: None, to_plugin: None, to_server: None, + to_pty_writer: None, should_silently_fail: true, }, os_input: None, diff --git a/zellij-utils/src/errors.rs b/zellij-utils/src/errors.rs index 396fc9c59f..ec709b21e2 100644 --- a/zellij-utils/src/errors.rs +++ b/zellij-utils/src/errors.rs @@ -182,6 +182,7 @@ pub enum ContextType { IPCServer(ServerContext), StdinHandler, AsyncTask, + PtyWrite(PtyWriteContext), /// An empty, placeholder call. This should be thought of as representing no call at all. /// A call stack representation filled with these is the representation of an empty call stack. Empty, @@ -197,6 +198,7 @@ impl Display for ContextType { ContextType::IPCServer(c) => Some(("ipc_server:", format!("{:?}", c))), ContextType::StdinHandler => Some(("stdin_handler_thread:", "AcceptInput".to_string())), ContextType::AsyncTask => Some(("stream_terminal_bytes:", "AsyncTask".to_string())), + ContextType::PtyWrite(c) => Some(("pty_writer_thread:", format!("{:?}", c))), ContextType::Empty => None, } { write!(f, "{} {}", left.purple(), right.green()) @@ -337,3 +339,8 @@ pub enum ServerContext { AttachClient, ConnStatus, } + +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub enum PtyWriteContext { + Write, +}