Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to nails 0.11.0 #11370

Merged
merged 1 commit into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/engine/nailgun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async_latch = { path = "../async_latch" }
bytes = "0.5"
futures = "0.3"
log = "0.4"
nails = "0.8"
nails = "0.11"
os_pipe = "0.9"
task_executor = { path = "../task_executor" }
tokio = { version = "0.2.23", features = ["tcp", "fs", "sync", "io-std", "signal"] }
Expand Down
8 changes: 0 additions & 8 deletions src/rust/engine/nailgun/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ async fn handle_client_output(
NailgunClientError::PostConnect(format!("Failed to flush stderr: {}", err))
})?
},
Some(ChildOutput::Exit(_)) => {
// NB: We ignore exit here and allow the main thread to handle exiting. This API is
// error prone: see https://github.com/stuhood/nails/issues/1 for more info.
}
None => break,
}
}
Expand Down Expand Up @@ -100,10 +96,6 @@ async fn handle_client_input(mut stdin_write: mpsc::Sender<ChildInput>) -> Resul
.await
.map_err(send_to_io)?;
}
stdin_write
.send(ChildInput::StdinEOF)
.await
.map_err(send_to_io)?;
Ok(())
}

Expand Down
112 changes: 48 additions & 64 deletions src/rust/engine/nailgun/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ use std::sync::Arc;

use async_latch::AsyncLatch;
use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
use futures::channel::oneshot;
use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt, TryStreamExt};
use log::{debug, error, info};
use nails::execution::{self, sink_for, stream_for, ChildInput, ChildOutput, ExitCode};
use log::{debug, info};
use nails::execution::{
self, child_channel, sink_for, stream_for, ChildInput, ChildOutput, ExitCode,
};
use nails::Nail;
use task_executor::Executor;
use tokio::fs::File;
Expand Down Expand Up @@ -227,85 +229,67 @@ struct RawFdNail {
}

impl Nail for RawFdNail {
fn spawn(
&self,
cmd: execution::Command,
input_stream: mpsc::Receiver<ChildInput>,
) -> Result<nails::server::Child, io::Error> {
fn spawn(&self, cmd: execution::Command) -> Result<nails::server::Child, io::Error> {
let env = cmd.env.iter().cloned().collect::<HashMap<_, _>>();

// Handle stdin. If the input stream closes, the run is cancelled.
let cancelled = AsyncLatch::new();
// Handle stdin.
let (stdin_handle, stdin_sink) = Self::input(Self::ttypath_from_env(&env, 0))?;
let accepts_stdin = {
let (accepts_stdin, input_stream_fut) = if let Some(mut stdin_sink) = stdin_sink {
// Forward all stdin to the child process.
(
true,
async move {
let mut input_stream = input_stream.filter_map(|child_input| {
Box::pin(async move {
match child_input {
ChildInput::Stdin(bytes) => Some(Ok(bytes)),
ChildInput::StdinEOF => None,
}
})
});
let _ = stdin_sink.send_all(&mut input_stream).await;
}
.boxed(),
)
} else {
// Stdin will be handled directly by the TTY. Only propagate cancellation.
(false, input_stream.fold((), |(), _| async {}).boxed())
};
// Spawn a task that will propagate the input stream, and then trigger cancellation.
let cancelled = cancelled.clone();
let _join = self.executor.spawn(input_stream_fut.map(move |_| {
cancelled.trigger();
}));
accepts_stdin
let maybe_stdin_write = if let Some(mut stdin_sink) = stdin_sink {
let (stdin_write, stdin_read) = child_channel::<ChildInput>();
// Spawn a task that will propagate the input stream.
let _join = self.executor.spawn(async move {
let mut input_stream = stdin_read.map(|child_input| match child_input {
ChildInput::Stdin(bytes) => Ok(bytes),
});
let _ = stdin_sink.send_all(&mut input_stream).await;
});
Some(stdin_write)
} else {
// Stdin will be handled directly by the TTY.
None
};

// And stdout/stderr.
let (stdout_stream, stdout_handle) = Self::output(Self::ttypath_from_env(&env, 1))?;
let (stderr_stream, stderr_handle) = Self::output(Self::ttypath_from_env(&env, 2))?;

// Set up a cancellation token that is triggered on client shutdown.
let cancelled = AsyncLatch::new();
let shutdown = {
let cancelled = cancelled.clone();
async move {
cancelled.trigger();
}
};

// Spawn the underlying function as a blocking task, and capture its exit code to append to the
// output stream.
let nail = self.clone();
let exit_code_future = self.executor.spawn_blocking(move || {
// NB: This closure captures the stdio handles, and will drop/close them when it completes.
(nail.runner)(RawFdExecution {
cmd,
cancelled,
stdin_fd: stdin_handle.as_raw_fd(),
stdout_fd: stdout_handle.as_raw_fd(),
stderr_fd: stderr_handle.as_raw_fd(),
let exit_code = self
.executor
.spawn_blocking(move || {
// NB: This closure captures the stdio handles, and will drop/close them when it completes.
(nail.runner)(RawFdExecution {
cmd,
cancelled,
stdin_fd: stdin_handle.as_raw_fd(),
stdout_fd: stdout_handle.as_raw_fd(),
stderr_fd: stderr_handle.as_raw_fd(),
})
})
});
.boxed();

// Fully consume the stdout/stderr streams before waiting on the exit stream.
// Select a single stdout/stderr stream.
let stdout_stream = stdout_stream.map_ok(ChildOutput::Stdout);
let stderr_stream = stderr_stream.map_ok(ChildOutput::Stderr);
let exit_stream = exit_code_future
.into_stream()
.map(|exit_code| Ok(ChildOutput::Exit(exit_code)));
let output_stream = stream::select(stdout_stream, stderr_stream)
.chain(exit_stream)
.map(|res| match res {
Ok(o) => o,
Err(e) => {
error!("IO error interacting with the runner: {:?}", e);
ChildOutput::Exit(ExitCode(-1))
}
})
.boxed();
let output_stream = stream::select(stdout_stream, stderr_stream).boxed();

Ok(nails::server::Child {
Ok(nails::server::Child::new(
output_stream,
accepts_stdin,
})
maybe_stdin_write,
exit_code,
Some(shutdown.boxed()),
))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ futures = "0.3"
hashing = { path = "../hashing" }
libc = "0.2.39"
log = "0.4"
nails = "0.8"
nails = "0.11"
sha2 = "0.9"
sharded_lmdb = { path = "../sharded_lmdb" }
shell-quote = "0.1.0"
Expand Down
12 changes: 11 additions & 1 deletion src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fs::{
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use log::{debug, info};
use nails::execution::{ChildOutput, ExitCode};
use nails::execution::ExitCode;
use shell_quote::bash;
use store::{OneOffStoreFileByDigest, Snapshot, Store};
use tokio::process::{Child, Command};
Expand Down Expand Up @@ -178,6 +178,16 @@ impl HermeticCommand {
}
}

// TODO: A Stream that ends with `Exit` is error prone: we should consider creating a Child struct
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Track with a Pants issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not the source of any trouble right now, afaik. So I don't think it meets the bar for an issue at the moment.

// similar to nails::server::Child (which is itself shaped like `std::process::Child`).
// See https://github.com/stuhood/nails/issues/1 for more info.
#[derive(Debug, PartialEq, Eq)]
pub enum ChildOutput {
Stdout(Bytes),
Stderr(Bytes),
Exit(ExitCode),
}

///
/// The fully collected outputs of a completed child process.
///
Expand Down
15 changes: 11 additions & 4 deletions src/rust/engine/process_execution/src/nailgun/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use async_trait::async_trait;
use futures::future::{FutureExt, TryFutureExt};
use futures::stream::{BoxStream, StreamExt};
use log::{debug, trace};
use nails::execution::{child_channel, ChildInput, ChildOutput, Command};
use nails::execution::{self, child_channel, ChildInput, Command};
use tokio::net::TcpStream;

use crate::local::CapturedWorkdir;
use crate::local::{CapturedWorkdir, ChildOutput};
use crate::nailgun::nailgun_pool::NailgunProcessName;
use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process,
Expand Down Expand Up @@ -276,12 +276,19 @@ impl CapturedWorkdir for CommandRunner {
})
.await?;

let output_stream = child.output_stream.take().unwrap();
let output_stream = child
.output_stream
.take()
.unwrap()
.map(|output| match output {
execution::ChildOutput::Stdout(bytes) => Ok(ChildOutput::Stdout(bytes)),
execution::ChildOutput::Stderr(bytes) => Ok(ChildOutput::Stderr(bytes)),
});
let exit_code = child
.wait()
.map_ok(ChildOutput::Exit)
.map_err(|e| format!("Error communicating with server: {}", e));

Ok(futures::stream::select(output_stream.map(Ok), exit_code.into_stream()).boxed())
Ok(futures::stream::select(output_stream, exit_code.into_stream()).boxed())
}
}