Skip to content

Commit

Permalink
fix missed lines on -z due to buffered io
Browse files Browse the repository at this point in the history
  • Loading branch information
I60R committed Dec 22, 2022
1 parent b811081 commit 7968cc0
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 35 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ compdef _nv nv # if you have completions installed

To automatically `lcd` into terminal's directory:

```
```zsh
chpwd () {
[ ! -z "$NVIM" ] && nv -x "lcd $PWD"
}
Expand Down
2 changes: 2 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ pub async fn open<Apis: From<Neovim<IoWrite>>>(
pub async fn close_and_exit<Apis: From<Neovim<IoWrite>>>(
nvim_connection: &mut NeovimConnection<Apis>
) -> ! {
log::trace!(target: "exit", "close and exit");

if let Some(ref mut process) = nvim_connection.nvim_proc {
if !process.is_finished() {
process
Expand Down
5 changes: 5 additions & 0 deletions src/pager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ pub mod gather_env {
opt.pagerized();
}

// Don't pagerize with -p enabled or when not read from pipe
if opt.pty_path_print || !input_from_pipe {
opt.pagerize = None;
}

// Fallback for neovim < 8.0 which don't uses $NVIM
if opt.address.is_none() {
if let Some(address) = std::env::var("NVIM_LISTEN_ADDRESS").ok() {
Expand Down
129 changes: 95 additions & 34 deletions src/pager/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,9 @@ async fn manage_output_buffer(
.execute_disconnect_commands()
.await;

connection::close_and_exit(nvim_conn).await;
outp_buf_actions
.done()
.await;
}


Expand Down Expand Up @@ -602,16 +604,20 @@ mod neovim_api_usage {
mod output_buffer_usage {
use super::{NeovimConnection, NeovimBuffer, context::OutputContext};
use connection::NotificationFromNeovim;
use std::io::{Read, Write};
use std::{
io::{Read, Write},
process::ExitStatus
};

/// This struct implements actions that should be done
/// after output buffer is attached
pub struct BufferActions<'a> {
nvim_conn: &'a mut NeovimConnection,
outp_ctx: &'a OutputContext,
buf: NeovimBuffer,
buf_pty: once_cell::unsync::OnceCell<std::fs::File>,
sink: Option<Box<dyn std::io::Write>>,
lines_displayed: usize,
child_page_process: Option<tokio::task::JoinHandle<Result<ExitStatus, std::io::Error>>>
}

pub fn begin<'a>(
Expand All @@ -623,8 +629,9 @@ mod output_buffer_usage {
nvim_conn,
outp_ctx,
buf,
buf_pty: once_cell::unsync::OnceCell::new(),
sink: None,
lines_displayed: 0,
child_page_process: None,
}
}

Expand Down Expand Up @@ -711,7 +718,7 @@ mod output_buffer_usage {

const CLEAR_SCREEN_SEQ: &[u8] = b"\x1B[3J\x1B[H\x1b[2J";
self
.get_buffer_pty()
.get_sink()
.write_all(CLEAR_SCREEN_SEQ)
.expect("Cannot write clear screen sequence");
}
Expand Down Expand Up @@ -793,6 +800,8 @@ mod output_buffer_usage {
/// Writes lines from stdin directly into PTY device
/// associated with output buffer.
pub async fn handle_output(&mut self) {
log::trace!(target: "output", "handle output");

// First write all prefetched lines if any available
for ln in &self.outp_ctx.prefetched_lines.0[..] {

Expand All @@ -804,7 +813,6 @@ mod output_buffer_usage {
.should_pagerize(self.lines_displayed)
{
self.pagerize_output();
return
}
}

Expand Down Expand Up @@ -835,19 +843,28 @@ mod output_buffer_usage {
.should_pagerize(self.lines_displayed)
{
self.pagerize_output();
return
}
}

Ok(b) => ln.push(b)
}
}

log::trace!(target: "output", "got EOF");

self.close_sink();

self.display_line(&[b'\0'])
.await
.expect("Cannot write EOF sequence");
}


/// In case if -q <count> argument provided it
/// might block until next line will be request from neovim side.
pub async fn handle_query_output(&mut self) {
log::trace!(target: "output", "handle query output");

let mut state = QueryState::default();
state.next_part(self.outp_ctx.query_lines_count);

Expand All @@ -858,7 +875,7 @@ mod output_buffer_usage {
.await;

let Some(ln) = prefetched_lines_iter.next() else {
log::info!(target: "output", "Proceed with stdin");
log::info!(target: "output", "Proceed query with stdin");

break
};
Expand All @@ -876,7 +893,6 @@ mod output_buffer_usage {
.await;

self.pagerize_output();
return
}
}

Expand Down Expand Up @@ -914,14 +930,18 @@ mod output_buffer_usage {
.should_pagerize(self.lines_displayed)
{
self.pagerize_output();
return
}
}

Ok(b) => ln.push(b)
}

}

log::trace!(target: "output", "got EOF");

self.close_sink();

self.nvim_conn.nvim_actions
.notify_query_finished(state.how_many_lines_was_sent())
.await;
Expand All @@ -939,7 +959,7 @@ mod output_buffer_usage {
/// If no such notification was arrived then page crashes
/// with the received IO error
async fn display_line(&mut self, ln: &[u8]) -> std::io::Result<()> {
let pty = self.get_buffer_pty();
let pty = self.get_sink();

if let Err(e) = pty.write_all(ln) {
log::info!(target: "writeline", "got error: {e:?}");
Expand All @@ -957,15 +977,17 @@ mod output_buffer_usage {
"Buffer was closed, not all input is shown"
);

connection::close_and_exit(self.nvim_conn).await
self.done()
.await
},
Ok(None) if self.nvim_conn.nvim_proc.is_some() => {
log::info!(
target: "writeline",
"Neovim was closed, not all input is shown"
);

connection::close_and_exit(self.nvim_conn).await
self.done()
.await;
},

_ => return Err(e),
Expand All @@ -979,7 +1001,13 @@ mod output_buffer_usage {

/// If there's more than 90_000 lines to read and -z flag provided
/// then output will be pagerized through spawning `page` again and again
fn pagerize_output(&self) {
fn pagerize_output(&mut self) {
if self.child_page_process.is_some() {
return
}

log::trace!(target: "pagerize", "output is too large");

let mut page_args = std::env::args();
page_args.next(); // skip `page`

Expand All @@ -996,16 +1024,26 @@ mod output_buffer_usage {
.to_string()
};

std::process::Command::new("page")
let mut page_process = std::process::Command::new("./page")
.stdin(std::process::Stdio::piped())
.args(page_args)
.arg("--pagerize-hidden")
.env("NVIM", &nvim_addr)
.arg(&format!("-a={nvim_addr}"))
.spawn()
.expect("Cannot spawn `page`")
.wait()
.expect("`page` died unexpectedly")
.code()
.unwrap_or(0);
.expect("Cannot spawn `page`");

self.sink
.replace(
Box::new(page_process.stdin
.take()
.unwrap()
)
);

let page_proc = tokio::task::spawn(async move {
page_process.wait()
});
self.child_page_process = Some(page_proc);
}


Expand Down Expand Up @@ -1034,12 +1072,14 @@ mod output_buffer_usage {
Some(NotificationFromNeovim::BufferClosed) => {
log::info!(target: "output-state", "Buffer closed");

connection::close_and_exit(self.nvim_conn).await
self.done()
.await;
}
None => {
log::info!(target: "output-state", "Neovim closed");

connection::close_and_exit(self.nvim_conn).await
self.done()
.await;
}
}
}
Expand Down Expand Up @@ -1098,20 +1138,41 @@ mod output_buffer_usage {
}
}

/// Closes child page process if some and neovim connection
/// then exits with 0 status code
pub async fn done(&mut self) {
log::trace!(target: "done", "now page can exit");

if let Some(page_proc) = &mut self.child_page_process {
log::trace!(target: "done", "wait on pagerized page");

if !page_proc.is_finished() {
page_proc.await;
}
}

connection::close_and_exit(self.nvim_conn).await
}

/// Returns PTY device associated with output buffer.
/// This function ensures that PTY device is opened only once
fn get_buffer_pty(&mut self) -> &mut std::fs::File {
self.buf_pty
.get_or_init(|| {
std::fs::OpenOptions::new()
.append(true)
.open(&self.outp_ctx.buf_pty_path)
.expect("Cannot open PTY device")
});

self.buf_pty.get_mut()
.unwrap()
fn get_sink(&mut self) -> &mut Box<dyn std::io::Write> {
self.sink
.get_or_insert_with(|| {
Box::new(
std::fs::OpenOptions::new()
.append(true)
.open(&self.outp_ctx.buf_pty_path)
.expect("Cannot open PTY device")
)
})
}

fn close_sink(&mut self) {
self.sink
.take();
}

}

/// Encapsulates state of querying lines from neovim side
Expand Down

0 comments on commit 7968cc0

Please sign in to comment.