From 7968cc002c7b15aba9069e438482d8e1df8ae17a Mon Sep 17 00:00:00 2001 From: 160R <160R@protonmail.com> Date: Thu, 22 Dec 2022 15:19:24 +0100 Subject: [PATCH] fix missed lines on -z due to buffered io --- README.md | 2 +- src/connection.rs | 2 + src/pager/context.rs | 5 ++ src/pager/main.rs | 129 +++++++++++++++++++++++++++++++------------ 4 files changed, 103 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 44e466d..09c41d9 100644 --- a/README.md +++ b/README.md @@ -284,7 +284,7 @@ compdef _nv nv # if you have completions installed To automatically `lcd` into terminal's directory: -``` +```zsh chpwd () { [ ! -z "$NVIM" ] && nv -x "lcd $PWD" } diff --git a/src/connection.rs b/src/connection.rs index 38100df..951a872 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -228,6 +228,8 @@ pub async fn open>>( pub async fn close_and_exit>>( nvim_connection: &mut NeovimConnection ) -> ! { + log::trace!(target: "exit", "close and exit"); + if let Some(ref mut process) = nvim_connection.nvim_proc { if !process.is_finished() { process diff --git a/src/pager/context.rs b/src/pager/context.rs index 5efac49..ed9482f 100644 --- a/src/pager/context.rs +++ b/src/pager/context.rs @@ -50,6 +50,11 @@ pub mod gather_env { opt.pagerized(); } + // Don't pagerize with -p enabled or when not read from pipe + if opt.pty_path_print || !input_from_pipe { + opt.pagerize = None; + } + // Fallback for neovim < 8.0 which don't uses $NVIM if opt.address.is_none() { if let Some(address) = std::env::var("NVIM_LISTEN_ADDRESS").ok() { diff --git a/src/pager/main.rs b/src/pager/main.rs index f27a13a..ded6f34 100644 --- a/src/pager/main.rs +++ b/src/pager/main.rs @@ -419,7 +419,9 @@ async fn manage_output_buffer( .execute_disconnect_commands() .await; - connection::close_and_exit(nvim_conn).await; + outp_buf_actions + .done() + .await; } @@ -602,7 +604,10 @@ mod neovim_api_usage { mod output_buffer_usage { use super::{NeovimConnection, NeovimBuffer, context::OutputContext}; use connection::NotificationFromNeovim; - use std::io::{Read, Write}; + use std::{ + io::{Read, Write}, + process::ExitStatus + }; /// This struct implements actions that should be done /// after output buffer is attached @@ -610,8 +615,9 @@ mod output_buffer_usage { nvim_conn: &'a mut NeovimConnection, outp_ctx: &'a OutputContext, buf: NeovimBuffer, - buf_pty: once_cell::unsync::OnceCell, + sink: Option>, lines_displayed: usize, + child_page_process: Option>> } pub fn begin<'a>( @@ -623,8 +629,9 @@ mod output_buffer_usage { nvim_conn, outp_ctx, buf, - buf_pty: once_cell::unsync::OnceCell::new(), + sink: None, lines_displayed: 0, + child_page_process: None, } } @@ -711,7 +718,7 @@ mod output_buffer_usage { const CLEAR_SCREEN_SEQ: &[u8] = b"\x1B[3J\x1B[H\x1b[2J"; self - .get_buffer_pty() + .get_sink() .write_all(CLEAR_SCREEN_SEQ) .expect("Cannot write clear screen sequence"); } @@ -793,6 +800,8 @@ mod output_buffer_usage { /// Writes lines from stdin directly into PTY device /// associated with output buffer. pub async fn handle_output(&mut self) { + log::trace!(target: "output", "handle output"); + // First write all prefetched lines if any available for ln in &self.outp_ctx.prefetched_lines.0[..] { @@ -804,7 +813,6 @@ mod output_buffer_usage { .should_pagerize(self.lines_displayed) { self.pagerize_output(); - return } } @@ -835,19 +843,28 @@ mod output_buffer_usage { .should_pagerize(self.lines_displayed) { self.pagerize_output(); - return } } Ok(b) => ln.push(b) } } + + log::trace!(target: "output", "got EOF"); + + self.close_sink(); + + self.display_line(&[b'\0']) + .await + .expect("Cannot write EOF sequence"); } /// In case if -q argument provided it /// might block until next line will be request from neovim side. pub async fn handle_query_output(&mut self) { + log::trace!(target: "output", "handle query output"); + let mut state = QueryState::default(); state.next_part(self.outp_ctx.query_lines_count); @@ -858,7 +875,7 @@ mod output_buffer_usage { .await; let Some(ln) = prefetched_lines_iter.next() else { - log::info!(target: "output", "Proceed with stdin"); + log::info!(target: "output", "Proceed query with stdin"); break }; @@ -876,7 +893,6 @@ mod output_buffer_usage { .await; self.pagerize_output(); - return } } @@ -914,14 +930,18 @@ mod output_buffer_usage { .should_pagerize(self.lines_displayed) { self.pagerize_output(); - return } } Ok(b) => ln.push(b) } + } + log::trace!(target: "output", "got EOF"); + + self.close_sink(); + self.nvim_conn.nvim_actions .notify_query_finished(state.how_many_lines_was_sent()) .await; @@ -939,7 +959,7 @@ mod output_buffer_usage { /// If no such notification was arrived then page crashes /// with the received IO error async fn display_line(&mut self, ln: &[u8]) -> std::io::Result<()> { - let pty = self.get_buffer_pty(); + let pty = self.get_sink(); if let Err(e) = pty.write_all(ln) { log::info!(target: "writeline", "got error: {e:?}"); @@ -957,7 +977,8 @@ mod output_buffer_usage { "Buffer was closed, not all input is shown" ); - connection::close_and_exit(self.nvim_conn).await + self.done() + .await }, Ok(None) if self.nvim_conn.nvim_proc.is_some() => { log::info!( @@ -965,7 +986,8 @@ mod output_buffer_usage { "Neovim was closed, not all input is shown" ); - connection::close_and_exit(self.nvim_conn).await + self.done() + .await; }, _ => return Err(e), @@ -979,7 +1001,13 @@ mod output_buffer_usage { /// If there's more than 90_000 lines to read and -z flag provided /// then output will be pagerized through spawning `page` again and again - fn pagerize_output(&self) { + fn pagerize_output(&mut self) { + if self.child_page_process.is_some() { + return + } + + log::trace!(target: "pagerize", "output is too large"); + let mut page_args = std::env::args(); page_args.next(); // skip `page` @@ -996,16 +1024,26 @@ mod output_buffer_usage { .to_string() }; - std::process::Command::new("page") + let mut page_process = std::process::Command::new("./page") + .stdin(std::process::Stdio::piped()) .args(page_args) .arg("--pagerize-hidden") - .env("NVIM", &nvim_addr) + .arg(&format!("-a={nvim_addr}")) .spawn() - .expect("Cannot spawn `page`") - .wait() - .expect("`page` died unexpectedly") - .code() - .unwrap_or(0); + .expect("Cannot spawn `page`"); + + self.sink + .replace( + Box::new(page_process.stdin + .take() + .unwrap() + ) + ); + + let page_proc = tokio::task::spawn(async move { + page_process.wait() + }); + self.child_page_process = Some(page_proc); } @@ -1034,12 +1072,14 @@ mod output_buffer_usage { Some(NotificationFromNeovim::BufferClosed) => { log::info!(target: "output-state", "Buffer closed"); - connection::close_and_exit(self.nvim_conn).await + self.done() + .await; } None => { log::info!(target: "output-state", "Neovim closed"); - connection::close_and_exit(self.nvim_conn).await + self.done() + .await; } } } @@ -1098,20 +1138,41 @@ mod output_buffer_usage { } } + /// Closes child page process if some and neovim connection + /// then exits with 0 status code + pub async fn done(&mut self) { + log::trace!(target: "done", "now page can exit"); + + if let Some(page_proc) = &mut self.child_page_process { + log::trace!(target: "done", "wait on pagerized page"); + + if !page_proc.is_finished() { + page_proc.await; + } + } + + connection::close_and_exit(self.nvim_conn).await + } + /// Returns PTY device associated with output buffer. /// This function ensures that PTY device is opened only once - fn get_buffer_pty(&mut self) -> &mut std::fs::File { - self.buf_pty - .get_or_init(|| { - std::fs::OpenOptions::new() - .append(true) - .open(&self.outp_ctx.buf_pty_path) - .expect("Cannot open PTY device") - }); - - self.buf_pty.get_mut() - .unwrap() + fn get_sink(&mut self) -> &mut Box { + self.sink + .get_or_insert_with(|| { + Box::new( + std::fs::OpenOptions::new() + .append(true) + .open(&self.outp_ctx.buf_pty_path) + .expect("Cannot open PTY device") + ) + }) + } + + fn close_sink(&mut self) { + self.sink + .take(); } + } /// Encapsulates state of querying lines from neovim side