Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🏃 Add a run-mode that executes the input program once and then exits #211

Merged
merged 10 commits into from
Jan 31, 2023
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ name = "viceroy"
path = "src/main.rs"

[dependencies]
anyhow = "^1.0.31"
hyper = { version = "^0.14.20", features = ["full"] }
itertools = "^0.10.5"
serde_json = "^1.0.59"
Expand Down
189 changes: 110 additions & 79 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#![cfg_attr(not(debug_assertions), doc(test(attr(allow(dead_code)))))]
#![cfg_attr(not(debug_assertions), doc(test(attr(allow(unused_variables)))))]

use std::process::ExitCode;

mod opts;

use {
Expand All @@ -35,75 +37,7 @@ use {
/// Create a new server, bind it to an address, and serve responses until an error occurs.
pub async fn serve(opts: Opts) -> Result<(), Error> {
// Load the wasm module into an execution context
let mut ctx = ExecuteCtx::new(opts.input(), opts.profiling_strategy(), opts.wasi_modules())?
.with_log_stderr(opts.log_stderr())
.with_log_stdout(opts.log_stdout());

if let Some(config_path) = opts.config_path() {
let config = FastlyConfig::from_file(config_path)?;
let backends = config.backends();
let geolocation = config.geolocation();
let dictionaries = config.dictionaries();
let object_store = config.object_store();
let secret_stores = config.secret_stores();
let backend_names = itertools::join(backends.keys(), ", ");

ctx = ctx
.with_backends(backends.clone())
.with_geolocation(geolocation.clone())
.with_dictionaries(dictionaries.clone())
.with_object_store(object_store.clone())
.with_secret_stores(secret_stores.clone())
.with_config_path(config_path.into());

if backend_names.is_empty() {
event!(
Level::WARN,
"no backend definitions found in {}",
config_path.display()
);
}

for (name, backend) in backends.iter() {
let client = Client::builder().build(BackendConnector::new(
backend.clone(),
ctx.tls_config().clone(),
));
let req = Request::get(&backend.uri).body(Body::empty()).unwrap();

event!(Level::INFO, "checking if backend '{}' is up", name);
match timeout(Duration::from_secs(5), client.request(req)).await {
// In the case that we don't time out but we have an error, we
// check that it's specifically a connection error as this is
// the only one that happens if the server is not up.
//
// We can't combine this with the case above due to needing the
// inner error to check if it's a connection error. The type
// checker complains about it.
Ok(Err(ref e)) if e.is_connect() => event!(
Level::WARN,
"backend '{}' on '{}' is not up right now",
name,
backend.uri
),
// In the case we timeout we assume the backend is not up as 5
// seconds to do a simple get should be enough for a healthy
// service
Err(_) => event!(
Level::WARN,
"backend '{}' on '{}' is not up right now",
name,
backend.uri
),
Ok(_) => event!(Level::INFO, "backend '{}' is up", name),
}
}
} else {
event!(
Level::WARN,
"no configuration provided, invoke with `-C <TOML_FILE>` to provide a configuration"
);
}
let ctx = create_execution_context(&opts).await?;

let addr = opts.addr();
ViceroyService::new(ctx).serve(addr).await?;
Expand All @@ -112,25 +46,43 @@ pub async fn serve(opts: Opts) -> Result<(), Error> {
}

#[tokio::main]
pub async fn main() -> Result<(), Error> {
pub async fn main() -> ExitCode {
// Parse the command-line options, exiting if there are any errors
let opts = Opts::parse();

install_tracing_subscriber(&opts);

tokio::select! {
_ = tokio::signal::ctrl_c() => {
Ok(())
if opts.run_mode() {
// println!("Using Viceroy to run tests...");
match run_wasm_main(opts).await {
Ok(_) => ExitCode::SUCCESS,
Err(_) => ExitCode::FAILURE,
}
res = serve(opts) => {
if let Err(ref e) = res {
event!(Level::ERROR, "{}", e);
} else {
match {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
Ok(())
}
res = serve(opts) => {
if let Err(ref e) = res {
event!(Level::ERROR, "{}", e);
}
res
}
}
res
} {
Ok(_) => ExitCode::SUCCESS,
Err(_) => ExitCode::FAILURE,
}
}
}

/// Execute a Wasm program in the Viceroy environment.
pub async fn run_wasm_main(opts: Opts) -> Result<(), anyhow::Error> {
// Load the wasm module into an execution context
let ctx = create_execution_context(&opts).await?;
ctx.run_main(opts.wasm_args()).await
}

fn install_tracing_subscriber(opts: &Opts) {
// Default to whatever a user provides, but if not set logging to work for
// viceroy and viceroy-lib so that they can have output in the terminal
Expand All @@ -145,6 +97,11 @@ fn install_tracing_subscriber(opts: &Opts) {
}
}
}
// If the quiet flag is passed in, don't log anything (this should maybe
// just be a verbosity setting)
if opts.quiet() {
env::set_var("RUST_LOG", "viceroy=off,viceroy-lib=off");
}
// Build a subscriber, using the default `RUST_LOG` environment variable for our filter.
let builder = FmtSubscriber::builder()
.with_writer(StdWriter::new())
Expand Down Expand Up @@ -224,3 +181,77 @@ impl<'a> MakeWriter<'a> for StdWriter {
}
}
}

async fn create_execution_context(opts: &Opts) -> Result<ExecuteCtx, anyhow::Error> {
let mut ctx = ExecuteCtx::new(opts.input(), opts.profiling_strategy(), opts.wasi_modules())?
.with_log_stderr(opts.log_stderr())
.with_log_stdout(opts.log_stdout());

if let Some(config_path) = opts.config_path() {
let config = FastlyConfig::from_file(config_path)?;
let backends = config.backends();
let geolocation = config.geolocation();
let dictionaries = config.dictionaries();
let object_store = config.object_store();
let secret_stores = config.secret_stores();
let backend_names = itertools::join(backends.keys(), ", ");

ctx = ctx
.with_backends(backends.clone())
.with_geolocation(geolocation.clone())
.with_dictionaries(dictionaries.clone())
.with_object_store(object_store.clone())
.with_secret_stores(secret_stores.clone())
.with_config_path(config_path.into());

if backend_names.is_empty() {
event!(
Level::WARN,
"no backend definitions found in {}",
config_path.display()
);
}
if !opts.run_mode() {
for (name, backend) in backends.iter() {
let client = Client::builder().build(BackendConnector::new(
backend.clone(),
ctx.tls_config().clone(),
));
let req = Request::get(&backend.uri).body(Body::empty()).unwrap();

event!(Level::INFO, "checking if backend '{}' is up", name);
match timeout(Duration::from_secs(5), client.request(req)).await {
// In the case that we don't time out but we have an error, we
// check that it's specifically a connection error as this is
// the only one that happens if the server is not up.
//
// We can't combine this with the case above due to needing the
// inner error to check if it's a connection error. The type
// checker complains about it.
Ok(Err(ref e)) if e.is_connect() => event!(
Level::WARN,
"backend '{}' on '{}' is not up right now",
name,
backend.uri
),
// In the case we timeout we assume the backend is not up as 5
// seconds to do a simple get should be enough for a healthy
// service
Err(_) => event!(
Level::WARN,
"backend '{}' on '{}' is not up right now",
name,
backend.uri
),
Ok(_) => event!(Level::INFO, "backend '{}' is up", name),
}
}
}
} else {
event!(
Level::WARN,
"no configuration provided, invoke with `-C <TOML_FILE>` to provide a configuration"
);
}
Ok(ctx)
}
62 changes: 62 additions & 0 deletions cli/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub struct Opts {
/// The path to a TOML file containing `local_server` configuration.
#[arg(short = 'C', long = "config")]
config_path: Option<PathBuf>,
/// Use Viceroy to run a module's _start function once, rather than in a
/// web server loop
#[arg(short = 'r', long = "run", default_value = "false")]
run_mode: bool,
/// Whether to treat stdout as a logging endpoint
#[arg(long = "log-stdout", default_value = "false")]
log_stdout: bool,
Expand All @@ -47,6 +51,13 @@ pub struct Opts {
/// Set of experimental WASI modules to link against.
#[arg(value_enum, long = "experimental_modules", required = false)]
experimental_modules: Vec<ExperimentalModuleArg>,
/// Don't log viceroy events to stdout or stderr
#[arg(short = 'q', long = "quiet", default_value = "false")]
quiet: bool,
/// Args to pass along to the binary being executed. This is only used when
/// run_mode=true
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
wasm_args: Vec<String>,
}

impl Opts {
Expand All @@ -66,6 +77,11 @@ impl Opts {
self.config_path.as_deref()
}

/// Whether Viceroy should run the input once and then exit
pub fn run_mode(&self) -> bool {
self.run_mode
}

/// Whether to treat stdout as a logging endpoint
pub fn log_stdout(&self) -> bool {
self.log_stdout
Expand All @@ -88,6 +104,17 @@ impl Opts {
self.profiler.unwrap_or(ProfilingStrategy::None)
}

/// The arguments to pass to the underlying binary when run_mode=true
pub fn wasm_args(&self) -> &[String] {
self.wasm_args.as_ref()
}

/// Prevents Viceroy from logging to stdout and stderr (note: any logs
/// emitted by the INPUT program will still go to stdout/stderr)
pub fn quiet(&self) -> bool {
self.quiet
}

// Set of experimental wasi modules to link against.
pub fn wasi_modules(&self) -> HashSet<ExperimentalModule> {
self.experimental_modules.iter().map(|x| x.into()).collect()
Expand Down Expand Up @@ -327,4 +354,39 @@ mod opts_tests {
Err(_) => Ok(()),
}
}

/// Test that trailing arguments are collected successfully
#[test]
fn trailing_args_are_collected() -> TestResult {
let args = &[
"dummy-program-name",
&test_file("minimal.wat"),
"--",
"--trailing-arg",
"--trailing-arg-2",
];
let opts = Opts::try_parse_from(args)?;
assert_eq!(opts.wasm_args(), &["--trailing-arg", "--trailing-arg-2"]);
Ok(())
}

/// Input is still accepted after double-dash. This is how the input will be
/// passed by cargo nextest if using Viceroy in run-mode to run tests
#[test]
fn input_accepted_after_double_dash() -> TestResult {
let args = &[
"dummy-program-name",
"--",
&test_file("minimal.wat"),
"--trailing-arg",
"--trailing-arg-2",
];
let opts = match Opts::try_parse_from(args) {
Ok(opts) => opts,
res => panic!("unexpected result: {:?}", res),
};
assert_eq!(opts.input().to_str().unwrap(), &test_file("minimal.wat"));
assert_eq!(opts.wasm_args(), &["--trailing-arg", "--trailing-arg-2"]);
Ok(())
}
}
54 changes: 54 additions & 0 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! Guest code execution.

use std::net::Ipv4Addr;

use anyhow::anyhow;
use {
crate::{
body::Body,
Expand Down Expand Up @@ -354,6 +357,57 @@ impl ExecuteCtx {

outcome
}

pub async fn run_main(self, args: &[String]) -> Result<(), anyhow::Error> {
// placeholders for request, result sender channel, and remote IP
let req = Request::get("http://example.com/").body(Body::empty())?;
let req_id = 0;
let (sender, _) = oneshot::channel();
let remote = Ipv4Addr::LOCALHOST.into();

let session = Session::new(
req_id,
req,
sender,
remote,
self.backends.clone(),
self.geolocation.clone(),
self.tls_config.clone(),
self.dictionaries.clone(),
self.config_path.clone(),
self.object_store.clone(),
self.secret_stores.clone(),
);

let mut store = create_store(&self, session).map_err(ExecutionError::Context)?;
store.data_mut().wasi().push_arg("wasm_program")?;
for arg in args {
store.data_mut().wasi().push_arg(arg)?;
}

let instance = self
.instance_pre
.instantiate_async(&mut store)
.await
.map_err(ExecutionError::Instantiation)?;

// Pull out the `_start` function, which by convention with WASI is the main entry point for
// an application.
let main_func = instance
.get_typed_func::<(), ()>(&mut store, "_start")
.map_err(ExecutionError::Typechecking)?;

// Invoke the entrypoint function and collect its exit code
let test_outcome = main_func.call_async(&mut store, ()).await;

// Ensure the downstream response channel is closed, whether or not a response was
// sent during execution.
store.data_mut().close_downstream_response_sender();
match test_outcome {
Ok(_) => Ok(()),
Err(_) => Err(anyhow!("Error running _start")),
}
}
}

fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config {
Expand Down
Loading