diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 0b615ee8..abf35ba5 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -30,6 +30,7 @@ name = "viceroy" path = "src/main.rs" [dependencies] +anyhow = "^1.0.31" hyper = { version = "^0.14.20", features = ["full"] } itertools = "^0.10.5" serde_json = "^1.0.59" diff --git a/cli/src/main.rs b/cli/src/main.rs index 8b6f1a10..e6718a2e 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -13,6 +13,8 @@ #![cfg_attr(not(debug_assertions), doc(test(attr(allow(dead_code)))))] #![cfg_attr(not(debug_assertions), doc(test(attr(allow(unused_variables)))))] +use std::process::ExitCode; + mod opts; use { @@ -35,75 +37,7 @@ use { /// Create a new server, bind it to an address, and serve responses until an error occurs. pub async fn serve(opts: Opts) -> Result<(), Error> { // Load the wasm module into an execution context - let mut ctx = ExecuteCtx::new(opts.input(), opts.profiling_strategy(), opts.wasi_modules())? - .with_log_stderr(opts.log_stderr()) - .with_log_stdout(opts.log_stdout()); - - if let Some(config_path) = opts.config_path() { - let config = FastlyConfig::from_file(config_path)?; - let backends = config.backends(); - let geolocation = config.geolocation(); - let dictionaries = config.dictionaries(); - let object_store = config.object_store(); - let secret_stores = config.secret_stores(); - let backend_names = itertools::join(backends.keys(), ", "); - - ctx = ctx - .with_backends(backends.clone()) - .with_geolocation(geolocation.clone()) - .with_dictionaries(dictionaries.clone()) - .with_object_store(object_store.clone()) - .with_secret_stores(secret_stores.clone()) - .with_config_path(config_path.into()); - - if backend_names.is_empty() { - event!( - Level::WARN, - "no backend definitions found in {}", - config_path.display() - ); - } - - for (name, backend) in backends.iter() { - let client = Client::builder().build(BackendConnector::new( - backend.clone(), - ctx.tls_config().clone(), - )); - let req = Request::get(&backend.uri).body(Body::empty()).unwrap(); - - event!(Level::INFO, "checking if backend '{}' is up", name); - match timeout(Duration::from_secs(5), client.request(req)).await { - // In the case that we don't time out but we have an error, we - // check that it's specifically a connection error as this is - // the only one that happens if the server is not up. - // - // We can't combine this with the case above due to needing the - // inner error to check if it's a connection error. The type - // checker complains about it. - Ok(Err(ref e)) if e.is_connect() => event!( - Level::WARN, - "backend '{}' on '{}' is not up right now", - name, - backend.uri - ), - // In the case we timeout we assume the backend is not up as 5 - // seconds to do a simple get should be enough for a healthy - // service - Err(_) => event!( - Level::WARN, - "backend '{}' on '{}' is not up right now", - name, - backend.uri - ), - Ok(_) => event!(Level::INFO, "backend '{}' is up", name), - } - } - } else { - event!( - Level::WARN, - "no configuration provided, invoke with `-C ` to provide a configuration" - ); - } + let ctx = create_execution_context(&opts).await?; let addr = opts.addr(); ViceroyService::new(ctx).serve(addr).await?; @@ -112,25 +46,46 @@ pub async fn serve(opts: Opts) -> Result<(), Error> { } #[tokio::main] -pub async fn main() -> Result<(), Error> { +pub async fn main() -> ExitCode { // Parse the command-line options, exiting if there are any errors let opts = Opts::parse(); - install_tracing_subscriber(&opts); - - tokio::select! { - _ = tokio::signal::ctrl_c() => { - Ok(()) + if opts.run_mode() { + match run_wasm_main(opts).await { + Ok(_) => ExitCode::SUCCESS, + Err(_) => ExitCode::FAILURE, } - res = serve(opts) => { - if let Err(ref e) = res { - event!(Level::ERROR, "{}", e); + } else { + match { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + Ok(()) + } + res = serve(opts) => { + if let Err(ref e) = res { + event!(Level::ERROR, "{}", e); + } + res + } } - res + } { + Ok(_) => ExitCode::SUCCESS, + Err(_) => ExitCode::FAILURE, } } } +/// Execute a Wasm program in the Viceroy environment. +pub async fn run_wasm_main(opts: Opts) -> Result<(), anyhow::Error> { + // Load the wasm module into an execution context + let ctx = create_execution_context(&opts).await?; + let program_name = match opts.input().file_stem() { + Some(stem) => stem.to_string_lossy(), + None => panic!("program cannot be a directory"), + }; + ctx.run_main(&program_name, opts.wasm_args()).await +} + fn install_tracing_subscriber(opts: &Opts) { // Default to whatever a user provides, but if not set logging to work for // viceroy and viceroy-lib so that they can have output in the terminal @@ -145,6 +100,11 @@ fn install_tracing_subscriber(opts: &Opts) { } } } + // If the quiet flag is passed in, don't log anything (this should maybe + // just be a verbosity setting) + if opts.quiet() { + env::set_var("RUST_LOG", "viceroy=off,viceroy-lib=off"); + } // Build a subscriber, using the default `RUST_LOG` environment variable for our filter. let builder = FmtSubscriber::builder() .with_writer(StdWriter::new()) @@ -224,3 +184,77 @@ impl<'a> MakeWriter<'a> for StdWriter { } } } + +async fn create_execution_context(opts: &Opts) -> Result { + let mut ctx = ExecuteCtx::new(opts.input(), opts.profiling_strategy(), opts.wasi_modules())? + .with_log_stderr(opts.log_stderr()) + .with_log_stdout(opts.log_stdout()); + + if let Some(config_path) = opts.config_path() { + let config = FastlyConfig::from_file(config_path)?; + let backends = config.backends(); + let geolocation = config.geolocation(); + let dictionaries = config.dictionaries(); + let object_store = config.object_store(); + let secret_stores = config.secret_stores(); + let backend_names = itertools::join(backends.keys(), ", "); + + ctx = ctx + .with_backends(backends.clone()) + .with_geolocation(geolocation.clone()) + .with_dictionaries(dictionaries.clone()) + .with_object_store(object_store.clone()) + .with_secret_stores(secret_stores.clone()) + .with_config_path(config_path.into()); + + if backend_names.is_empty() { + event!( + Level::WARN, + "no backend definitions found in {}", + config_path.display() + ); + } + if !opts.run_mode() { + for (name, backend) in backends.iter() { + let client = Client::builder().build(BackendConnector::new( + backend.clone(), + ctx.tls_config().clone(), + )); + let req = Request::get(&backend.uri).body(Body::empty()).unwrap(); + + event!(Level::INFO, "checking if backend '{}' is up", name); + match timeout(Duration::from_secs(5), client.request(req)).await { + // In the case that we don't time out but we have an error, we + // check that it's specifically a connection error as this is + // the only one that happens if the server is not up. + // + // We can't combine this with the case above due to needing the + // inner error to check if it's a connection error. The type + // checker complains about it. + Ok(Err(ref e)) if e.is_connect() => event!( + Level::WARN, + "backend '{}' on '{}' is not up right now", + name, + backend.uri + ), + // In the case we timeout we assume the backend is not up as 5 + // seconds to do a simple get should be enough for a healthy + // service + Err(_) => event!( + Level::WARN, + "backend '{}' on '{}' is not up right now", + name, + backend.uri + ), + Ok(_) => event!(Level::INFO, "backend '{}' is up", name), + } + } + } + } else { + event!( + Level::WARN, + "no configuration provided, invoke with `-C ` to provide a configuration" + ); + } + Ok(ctx) +} diff --git a/cli/src/opts.rs b/cli/src/opts.rs index a5af89f0..51865f44 100644 --- a/cli/src/opts.rs +++ b/cli/src/opts.rs @@ -30,6 +30,11 @@ pub struct Opts { /// The path to a TOML file containing `local_server` configuration. #[arg(short = 'C', long = "config")] config_path: Option, + /// [EXPERIMENTAL] Use Viceroy to run a module's _start function once, + /// rather than in a web server loop. This is experimental and the specific + /// interface for this is going to change in the near future. + #[arg(short = 'r', long = "run", default_value = "false", hide = true)] + run_mode: bool, /// Whether to treat stdout as a logging endpoint #[arg(long = "log-stdout", default_value = "false")] log_stdout: bool, @@ -47,6 +52,13 @@ pub struct Opts { /// Set of experimental WASI modules to link against. #[arg(value_enum, long = "experimental_modules", required = false)] experimental_modules: Vec, + /// Don't log viceroy events to stdout or stderr + #[arg(short = 'q', long = "quiet", default_value = "false")] + quiet: bool, + /// [EXPERIMENTAL] Args to pass along to the binary being executed. This is + /// only used when run_mode=true + #[arg(trailing_var_arg = true, allow_hyphen_values = true, hide = true)] + wasm_args: Vec, } impl Opts { @@ -66,6 +78,11 @@ impl Opts { self.config_path.as_deref() } + /// Whether Viceroy should run the input once and then exit + pub fn run_mode(&self) -> bool { + self.run_mode + } + /// Whether to treat stdout as a logging endpoint pub fn log_stdout(&self) -> bool { self.log_stdout @@ -88,6 +105,17 @@ impl Opts { self.profiler.unwrap_or(ProfilingStrategy::None) } + /// The arguments to pass to the underlying binary when run_mode=true + pub fn wasm_args(&self) -> &[String] { + self.wasm_args.as_ref() + } + + /// Prevents Viceroy from logging to stdout and stderr (note: any logs + /// emitted by the INPUT program will still go to stdout/stderr) + pub fn quiet(&self) -> bool { + self.quiet + } + // Set of experimental wasi modules to link against. pub fn wasi_modules(&self) -> HashSet { self.experimental_modules.iter().map(|x| x.into()).collect() @@ -327,4 +355,39 @@ mod opts_tests { Err(_) => Ok(()), } } + + /// Test that trailing arguments are collected successfully + #[test] + fn trailing_args_are_collected() -> TestResult { + let args = &[ + "dummy-program-name", + &test_file("minimal.wat"), + "--", + "--trailing-arg", + "--trailing-arg-2", + ]; + let opts = Opts::try_parse_from(args)?; + assert_eq!(opts.wasm_args(), &["--trailing-arg", "--trailing-arg-2"]); + Ok(()) + } + + /// Input is still accepted after double-dash. This is how the input will be + /// passed by cargo nextest if using Viceroy in run-mode to run tests + #[test] + fn input_accepted_after_double_dash() -> TestResult { + let args = &[ + "dummy-program-name", + "--", + &test_file("minimal.wat"), + "--trailing-arg", + "--trailing-arg-2", + ]; + let opts = match Opts::try_parse_from(args) { + Ok(opts) => opts, + res => panic!("unexpected result: {:?}", res), + }; + assert_eq!(opts.input().to_str().unwrap(), &test_file("minimal.wat")); + assert_eq!(opts.wasm_args(), &["--trailing-arg", "--trailing-arg-2"]); + Ok(()) + } } diff --git a/lib/src/execute.rs b/lib/src/execute.rs index 03060351..5747bc76 100644 --- a/lib/src/execute.rs +++ b/lib/src/execute.rs @@ -1,5 +1,7 @@ //! Guest code execution. +use std::net::Ipv4Addr; + use { crate::{ body::Body, @@ -354,6 +356,55 @@ impl ExecuteCtx { outcome } + + pub async fn run_main(self, program_name: &str, args: &[String]) -> Result<(), anyhow::Error> { + // placeholders for request, result sender channel, and remote IP + let req = Request::get("http://example.com/").body(Body::empty())?; + let req_id = 0; + let (sender, _) = oneshot::channel(); + let remote = Ipv4Addr::LOCALHOST.into(); + + let session = Session::new( + req_id, + req, + sender, + remote, + self.backends.clone(), + self.geolocation.clone(), + self.tls_config.clone(), + self.dictionaries.clone(), + self.config_path.clone(), + self.object_store.clone(), + self.secret_stores.clone(), + ); + + let mut store = create_store(&self, session).map_err(ExecutionError::Context)?; + store.data_mut().wasi().push_arg(program_name)?; + for arg in args { + store.data_mut().wasi().push_arg(arg)?; + } + + let instance = self + .instance_pre + .instantiate_async(&mut store) + .await + .map_err(ExecutionError::Instantiation)?; + + // Pull out the `_start` function, which by convention with WASI is the main entry point for + // an application. + let main_func = instance + .get_typed_func::<(), ()>(&mut store, "_start") + .map_err(ExecutionError::Typechecking)?; + + // Invoke the entrypoint function and collect its exit code + let result = main_func.call_async(&mut store, ()).await; + + // Ensure the downstream response channel is closed, whether or not a response was + // sent during execution. + store.data_mut().close_downstream_response_sender(); + + result + } } fn configure_wasmtime(profiling_strategy: ProfilingStrategy) -> wasmtime::Config { diff --git a/lib/src/linking.rs b/lib/src/linking.rs index 1b3d0e1f..a0609a60 100644 --- a/lib/src/linking.rs +++ b/lib/src/linking.rs @@ -20,7 +20,7 @@ pub struct WasmCtx { } impl WasmCtx { - fn wasi(&mut self) -> &mut WasiCtx { + pub fn wasi(&mut self) -> &mut WasiCtx { &mut self.wasi } @@ -28,7 +28,7 @@ impl WasmCtx { &mut self.wasi_nn } - fn session(&mut self) -> &mut Session { + pub fn session(&mut self) -> &mut Session { &mut self.session } }